http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
new file mode 100644
index 0000000..b12952a
--- /dev/null
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.client.FlinkYarnSessionCli;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * This base class allows to use the MiniYARNCluster.
+ * The cluster is re-used for all tests.
+ *
+ * This class is located in a different package which is build after 
flink-dist. This way,
+ * we can use the YARN uberjar of flink to start a Flink YARN session.
+ */
+public abstract class YarnTestBase {
+       private static final Logger LOG = 
LoggerFactory.getLogger(YARNSessionFIFOIT.class);
+
+       private final static PrintStream originalStdout = System.out;
+       private final static PrintStream originalStderr = System.err;
+
+
+       // Temp directory which is deleted after the unit test.
+       private static TemporaryFolder tmp = new TemporaryFolder();
+
+       protected static MiniYARNCluster yarnCluster = null;
+
+       protected static File flinkUberjar;
+
+       protected static final Configuration yarnConfiguration;
+       static {
+               yarnConfiguration = new YarnConfiguration();
+               
yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 
512);
+               
yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 
4096); // 4096 is the available memory anyways
+               
yarnConfiguration.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, 
true);
+               
yarnConfiguration.setBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
 true);
+               yarnConfiguration.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 
2);
+               
yarnConfiguration.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2);
+               
yarnConfiguration.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600);
+               
yarnConfiguration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
+               yarnConfiguration.setInt(YarnConfiguration.NM_VCORES, 666); // 
memory is overwritten in the MiniYARNCluster.
+               // so we have to change the number of cores for testing.
+       }
+
+       // This code is taken from: http://stackoverflow.com/a/7201825/568695
+       // it changes the environment variables of this JVM. Use only for 
testing purposes!
+       private static void setEnv(Map<String, String> newenv) {
+               try {
+                       Class<?> processEnvironmentClass = 
Class.forName("java.lang.ProcessEnvironment");
+                       Field theEnvironmentField = 
processEnvironmentClass.getDeclaredField("theEnvironment");
+                       theEnvironmentField.setAccessible(true);
+                       Map<String, String> env = (Map<String, String>) 
theEnvironmentField.get(null);
+                       env.putAll(newenv);
+                       Field theCaseInsensitiveEnvironmentField = 
processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
+                       theCaseInsensitiveEnvironmentField.setAccessible(true);
+                       Map<String, String> cienv = (Map<String, String>) 
theCaseInsensitiveEnvironmentField.get(null);
+                       cienv.putAll(newenv);
+               } catch (NoSuchFieldException e) {
+                       try {
+                               Class[] classes = 
Collections.class.getDeclaredClasses();
+                               Map<String, String> env = System.getenv();
+                               for (Class cl : classes) {
+                                       if 
("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
+                                               Field field = 
cl.getDeclaredField("m");
+                                               field.setAccessible(true);
+                                               Object obj = field.get(env);
+                                               Map<String, String> map = 
(Map<String, String>) obj;
+                                               map.clear();
+                                               map.putAll(newenv);
+                                       }
+                               }
+                       } catch (Exception e2) {
+                               throw new RuntimeException(e2);
+                       }
+               } catch (Exception e1) {
+                       throw new RuntimeException(e1);
+               }
+       }
+
+       /**
+        * Sleep a bit between the tests (we are re-using the YARN cluster for 
the tests)
+        */
+       @After
+       public void sleep() {
+               try {
+                       Thread.sleep(500);
+               } catch (InterruptedException e) {
+                       Assert.fail("Should not happen");
+               }
+       }
+
+       @Before
+       public void checkClusterEmpty() throws IOException, YarnException {
+               YarnClient yarnClient = YarnClient.createYarnClient();
+               yarnClient.init(yarnConfiguration);
+               yarnClient.start();
+               List<ApplicationReport> apps = yarnClient.getApplications();
+               for(ApplicationReport app : apps) {
+                       if(app.getYarnApplicationState() != 
YarnApplicationState.FINISHED) {
+                               Assert.fail("There is at least one application 
on the cluster is not finished");
+                       }
+               }
+       }
+
+       /**
+        * Locate a file or diretory directory
+        */
+       public static File findFile(String startAt, FilenameFilter fnf) {
+               File root = new File(startAt);
+               String[] files = root.list();
+               if(files == null) {
+                       return null;
+               }
+               for(String file : files) {
+
+                       File f = new File(startAt + File.separator + file);
+                       if(f.isDirectory()) {
+                               File r = findFile(f.getAbsolutePath(), fnf);
+                               if(r != null) {
+                                       return r;
+                               }
+                       } else if (fnf.accept(f.getParentFile(), f.getName())) {
+                               return f;
+                       }
+
+               }
+               return null;
+       }
+
+       /**
+        * Filter to find root dir of the flink-yarn dist.
+        */
+       public static class RootDirFilenameFilter implements FilenameFilter {
+               @Override
+               public boolean accept(File dir, String name) {
+                       return name.endsWith("yarn-uberjar.jar") && 
dir.toString().contains("/lib");
+               }
+       }
+       public static class ContainsName implements FilenameFilter {
+               private String name;
+               private String excludeInPath = null;
+
+               public ContainsName(String name) {
+                       this.name = name;
+               }
+
+               public ContainsName(String name, String excludeInPath) {
+                       this.name = name;
+                       this.excludeInPath = excludeInPath;
+               }
+
+               @Override
+               public boolean accept(File dir, String name) {
+                       if(excludeInPath == null) {
+                               return name.contains(this.name);
+                       } else {
+                               return name.contains(this.name) && 
!dir.toString().contains(excludeInPath);
+                       }
+               }
+       }
+
+       public static File writeYarnSiteConfigXML(Configuration yarnConf) 
throws IOException {
+               tmp.create();
+               File yarnSiteXML = new File(tmp.newFolder().getAbsolutePath() + 
"/yarn-site.xml");
+
+               FileWriter writer = new FileWriter(yarnSiteXML);
+               yarnConf.writeXml(writer);
+               writer.flush();
+               writer.close();
+               return yarnSiteXML;
+       }
+
+       public static void startYARNWithConfig(Configuration conf) {
+               flinkUberjar = findFile(".", new RootDirFilenameFilter());
+               Assert.assertNotNull(flinkUberjar);
+               String flinkDistRootDir = 
flinkUberjar.getParentFile().getParent();
+
+               if (!flinkUberjar.exists()) {
+                       Assert.fail("Unable to locate yarn-uberjar.jar");
+               }
+
+               try {
+                       LOG.info("Starting up MiniYARN cluster");
+                       if (yarnCluster == null) {
+                               yarnCluster = new 
MiniYARNCluster(YARNSessionFIFOIT.class.getName(), 2, 1, 1);
+
+                               yarnCluster.init(conf);
+                               yarnCluster.start();
+                       }
+
+                       Map<String, String> map = new HashMap<String, 
String>(System.getenv());
+                       File flinkConfFilePath = findFile(flinkDistRootDir, new 
ContainsName("flink-conf.yaml"));
+                       Assert.assertNotNull(flinkConfFilePath);
+                       map.put("FLINK_CONF_DIR", 
flinkConfFilePath.getParent());
+                       File yarnConfFile = writeYarnSiteConfigXML(conf);
+                       map.put("YARN_CONF_DIR", 
yarnConfFile.getParentFile().getAbsolutePath());
+                       setEnv(map);
+
+                       Assert.assertTrue(yarnCluster.getServiceState() == 
Service.STATE.STARTED);
+               } catch (Exception ex) {
+                       ex.printStackTrace();
+                       LOG.error("setup failure", ex);
+                       Assert.fail();
+               }
+       }
+
+       /**
+        * Default @BeforeClass impl. Overwrite this for passing a different 
configuration
+        */
+       @BeforeClass
+       public static void setup() {
+               startYARNWithConfig(yarnConfiguration);
+       }
+
+       // -------------------------- Runner -------------------------- //
+
+       private static ByteArrayOutputStream outContent;
+       private static ByteArrayOutputStream errContent;
+       enum RunTypes {
+               YARN_SESSION, CLI_FRONTEND
+       }
+
+       protected void runWithArgs(String[] args, String expect, RunTypes type) 
{
+               LOG.info("Running with args "+ Arrays.toString(args));
+
+               outContent = new ByteArrayOutputStream();
+               errContent = new ByteArrayOutputStream();
+               System.setOut(new PrintStream(outContent));
+               System.setErr(new PrintStream(errContent));
+
+
+               final int START_TIMEOUT_SECONDS = 60;
+
+               Runner runner = new Runner(args, type);
+               runner.start();
+
+               boolean expectedStringSeen = false;
+               for(int second = 0; second <  START_TIMEOUT_SECONDS; second++) {
+                       try {
+                               Thread.sleep(1000);
+                       } catch (InterruptedException e) {
+                               Assert.fail("Interruption not expected");
+                       }
+                       // check output for correct TaskManager startup.
+                       if(outContent.toString().contains(expect)
+                                       || 
errContent.toString().contains(expect) ) {
+                               expectedStringSeen = true;
+                               LOG.info("Found expected output in redirected 
streams");
+                               // send "stop" command to command line interface
+                               runner.sendStop();
+                               // wait for the thread to stop
+                               try {
+                                       runner.join(1000);
+                               } catch (InterruptedException e) {
+                                       LOG.warn("Interrupted while stopping 
runner", e);
+                               }
+                               LOG.warn("stopped");
+                               break;
+                       }
+                       // check if thread died
+                       if(!runner.isAlive()) {
+                               sendOutput();
+                               Assert.fail("Runner thread died before the test 
was finished. Return value = "+runner.getReturnValue());
+                       }
+               }
+
+               sendOutput();
+               Assert.assertTrue("During the timeout period of " + 
START_TIMEOUT_SECONDS + " seconds the " +
+                               "expected string did not show up", 
expectedStringSeen);
+               LOG.info("Test was successful");
+       }
+
+       private static void sendOutput() {
+               System.setOut(originalStdout);
+               System.setErr(originalStderr);
+
+               LOG.info("Sending stdout content through logger: 
\n\n"+outContent.toString()+"\n\n");
+               LOG.info("Sending stderr content through logger: 
\n\n"+errContent.toString()+"\n\n");
+       }
+
+       public static class Runner extends Thread {
+               private final String[] args;
+               private int returnValue;
+               private RunTypes type;
+               private FlinkYarnSessionCli yCli;
+
+               public Runner(String[] args, RunTypes type) {
+                       this.args = args;
+                       this.type = type;
+               }
+
+               public int getReturnValue() {
+                       return returnValue;
+               }
+
+               @Override
+               public void run() {
+                       switch(type) {
+                               case YARN_SESSION:
+                                       yCli = new FlinkYarnSessionCli("", "");
+                                       returnValue = yCli.run(args);
+                                       break;
+                               case CLI_FRONTEND:
+                                       CliFrontend cli = new CliFrontend();
+                                       returnValue = cli.parseParameters(args);
+                                       break;
+                               default:
+                                       throw new RuntimeException("Unknown 
type " + type);
+                       }
+
+                       if(returnValue != 0) {
+                               Assert.fail("The YARN session returned with 
non-null value="+returnValue);
+                       }
+               }
+
+               public void sendStop() {
+                       if(yCli != null) {
+                               yCli.stop();
+                       }
+               }
+       }
+
+       // -------------------------- Tear down -------------------------- //
+
+       @AfterClass
+       public static void tearDown() {
+               //shutdown YARN cluster
+               if (yarnCluster != null) {
+                       LOG.info("shutdown MiniYarn cluster");
+                       yarnCluster.stop();
+                       yarnCluster = null;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn-tests/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/resources/log4j-test.properties 
b/flink-yarn-tests/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..26d6a12
--- /dev/null
+++ b/flink-yarn-tests/src/test/resources/log4j-test.properties
@@ -0,0 +1,25 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, file
+
+# Log all infos in the given file
+log4j.appender.file=org.apache.log4j.ConsoleAppender
+log4j.appender.file.append=false
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - 
%m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn/pom.xml b/flink-yarn/pom.xml
new file mode 100644
index 0000000..1569f15
--- /dev/null
+++ b/flink-yarn/pom.xml
@@ -0,0 +1,228 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-parent</artifactId>
+               <version>0.9-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+       
+       <artifactId>flink-yarn</artifactId>
+       <name>flink-yarn</name>
+       <packaging>jar</packaging>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-runtime</artifactId>
+                       <version>${project.version}</version>
+                       <exclusions>
+                               <exclusion>
+                                       <artifactId>hadoop-core</artifactId>
+                                       <groupId>org.apache.hadoop</groupId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+               
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-clients</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>com.typesafe.akka</groupId>
+                       <artifactId>akka-actor_2.10</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>com.typesafe.akka</groupId>
+                       <artifactId>akka-remote_2.10</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>com.typesafe.akka</groupId>
+                       <artifactId>akka-camel_2.10</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.camel</groupId>
+                       <artifactId>camel-stream</artifactId>
+                       <version>2.14.0</version>
+               </dependency>
+
+               <!--  guava needs to be in "provided" scope, to make sure it is 
not included into the jars by the shading -->
+               <dependency>
+                       <groupId>com.google.guava</groupId>
+                       <artifactId>guava</artifactId>
+                       <version>${guava.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+               
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-yarn-client</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-common</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-hdfs</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-mapreduce-client-core</artifactId>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <!-- Scala Compiler -->
+                       <plugin>
+                               <groupId>net.alchim31.maven</groupId>
+                               <artifactId>scala-maven-plugin</artifactId>
+                               <version>3.1.4</version>
+                               <executions>
+                                       <!-- Run scala compiler in the 
process-resources phase, so that dependencies on
+                                               scala classes can be resolved 
later in the (Java) compile phase -->
+                                       <execution>
+                                               <id>scala-compile-first</id>
+                                               <phase>process-resources</phase>
+                                               <goals>
+                                                       <goal>compile</goal>
+                                               </goals>
+                                       </execution>
+
+                                       <!-- Run scala compiler in the 
process-test-resources phase, so that dependencies on
+                                                scala classes can be resolved 
later in the (Java) test-compile phase -->
+                                       <execution>
+                                               <id>scala-test-compile</id>
+                                               
<phase>process-test-resources</phase>
+                                               <goals>
+                                                       <goal>testCompile</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                               <configuration>
+                                       <jvmArgs>
+                                               <jvmArg>-Xms128m</jvmArg>
+                                               <jvmArg>-Xmx512m</jvmArg>
+                                       </jvmArgs>
+                               </configuration>
+                       </plugin>
+
+                       <!-- Eclipse Integration -->
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-eclipse-plugin</artifactId>
+                               <version>2.8</version>
+                               <configuration>
+                                       <downloadSources>true</downloadSources>
+                                       <projectnatures>
+                                               
<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+                                               
<projectnature>org.eclipse.jdt.core.javanature</projectnature>
+                                       </projectnatures>
+                                       <buildcommands>
+                                               
<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+                                       </buildcommands>
+                                       <classpathContainers>
+                                               
<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+                                               
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+                                       </classpathContainers>
+                                       <excludes>
+                                               
<exclude>org.scala-lang:scala-library</exclude>
+                                               
<exclude>org.scala-lang:scala-compiler</exclude>
+                                       </excludes>
+                                       <sourceIncludes>
+                                               
<sourceInclude>**/*.scala</sourceInclude>
+                                               
<sourceInclude>**/*.java</sourceInclude>
+                                       </sourceIncludes>
+                               </configuration>
+                       </plugin>
+
+                       <!-- Adding scala source directories to build path -->
+                       <plugin>
+                               <groupId>org.codehaus.mojo</groupId>
+                               
<artifactId>build-helper-maven-plugin</artifactId>
+                               <version>1.7</version>
+                               <executions>
+                                       <!-- Add src/main/scala to eclipse 
build path -->
+                                       <execution>
+                                               <id>add-source</id>
+                                               <phase>generate-sources</phase>
+                                               <goals>
+                                                       <goal>add-source</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <sources>
+                                                               
<source>src/main/scala</source>
+                                                       </sources>
+                                               </configuration>
+                                       </execution>
+                                       <!-- Add src/test/scala to eclipse 
build path -->
+                                       <execution>
+                                               <id>add-test-source</id>
+                                               
<phase>generate-test-sources</phase>
+                                               <goals>
+                                                       
<goal>add-test-source</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <sources>
+                                                               
<source>src/test/scala</source>
+                                                       </sources>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+
+                       <plugin>
+                               <groupId>org.scalastyle</groupId>
+                               <artifactId>scalastyle-maven-plugin</artifactId>
+                               <version>0.5.0</version>
+                               <executions>
+                                       <execution>
+                                               <goals>
+                                                       <goal>check</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                               <configuration>
+                                       <verbose>false</verbose>
+                                       <failOnViolation>true</failOnViolation>
+                                       
<includeTestSourceDirectory>true</includeTestSourceDirectory>
+                                       <failOnWarning>false</failOnWarning>
+                                       
<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
+                                       
<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
+                                       
<configLocation>${project.basedir}/../tools/maven/scalastyle-config.xml</configLocation>
+                                       
<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
+                                       <outputEncoding>UTF-8</outputEncoding>
+                               </configuration>
+                       </plugin>
+               </plugins>
+       </build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
new file mode 100644
index 0000000..c922963
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
@@ -0,0 +1,653 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.client.FlinkYarnSessionCli;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * All classes in this package contain code taken from
+ * 
https://github.com/apache/hadoop-common/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?source=cc
+ * and
+ * https://github.com/hortonworks/simple-yarn-app
+ * and
+ * 
https://github.com/yahoo/storm-yarn/blob/master/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java
+ *
+ * The Flink jar is uploaded to HDFS by this client.
+ * The application master and all the TaskManager containers get the jar file 
downloaded
+ * by YARN into their local fs.
+ *
+ */
+public class FlinkYarnClient extends AbstractFlinkYarnClient {
+       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkYarnClient.class);
+
+       /**
+        * Constants,
+        * all starting with ENV_ are used as environment variables to pass 
values from the Client
+        * to the Application Master.
+        */
+       public final static String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY";
+       public final static String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
+       public final static String ENV_APP_ID = "_APP_ID";
+       public final static String FLINK_JAR_PATH = "_FLINK_JAR_PATH"; // the 
Flink jar resource location (in HDFS).
+       public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR";
+       public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
+       public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
+       public static final String ENV_SLOTS = "_SLOTS";
+       public static final String ENV_DYNAMIC_PROPERTIES = 
"_DYNAMIC_PROPERTIES";
+
+       private static final String DEFAULT_QUEUE_NAME = "default";
+
+
+       /**
+        * Minimum memory requirements, checked by the Client.
+        */
+       private static final int MIN_JM_MEMORY = 128;
+       private static final int MIN_TM_MEMORY = 128;
+
+       private Configuration conf;
+       private YarnClient yarnClient;
+       private YarnClientApplication yarnApplication;
+
+
+       /**
+        * Files (usually in a distributed file system) used for the YARN 
session of Flink.
+        * Contains configuration files and jar files.
+        */
+       private Path sessionFilesDir;
+
+       /**
+        * If the user has specified a different number of slots, we store them 
here
+        */
+       private int slots = -1;
+
+       private int jobManagerMemoryMb = 512;
+
+       private int taskManagerMemoryMb = 512;
+
+       private int taskManagerCount = 1;
+
+       private String yarnQueue = DEFAULT_QUEUE_NAME;
+
+       private String configurationDirectory;
+
+       private Path flinkConfigurationPath;
+
+       private Path flinkLoggingConfigurationPath; // optional
+
+       private Path flinkJarPath;
+
+       private String dynamicPropertiesEncoded;
+
+       private List<File> shipFiles = new ArrayList<File>();
+
+
+       public FlinkYarnClient() {
+               // Check if security is enabled
+               if(UserGroupInformation.isSecurityEnabled()) {
+                       throw new RuntimeException("Flink YARN client does not 
have security support right now."
+                                       + "File a bug, we will fix it asap");
+               }
+               conf = Utils.initializeYarnConfiguration();
+               if(this.yarnClient == null) {
+                       // Create yarnClient
+                       yarnClient = YarnClient.createYarnClient();
+                       yarnClient.init(conf);
+                       yarnClient.start();
+               }
+       }
+
+       @Override
+       public void setJobManagerMemory(int memoryMb) {
+               if(memoryMb < MIN_JM_MEMORY) {
+                       throw new IllegalArgumentException("The JobManager 
memory is below the minimum required memory amount "
+                                       + "of "+MIN_JM_MEMORY+" MB");
+               }
+               this.jobManagerMemoryMb = memoryMb;
+       }
+
+       @Override
+       public void setTaskManagerMemory(int memoryMb) {
+               if(memoryMb < MIN_TM_MEMORY) {
+                       throw new IllegalArgumentException("The TaskManager 
memory is below the minimum required memory amount "
+                                       + "of "+MIN_TM_MEMORY+" MB");
+               }
+               this.taskManagerMemoryMb = memoryMb;
+       }
+
+       @Override
+       public void setTaskManagerSlots(int slots) {
+               if(slots <= 0) {
+                       throw new IllegalArgumentException("Number of 
TaskManager slots must be positive");
+               }
+               this.slots = slots;
+       }
+
+       @Override
+       public int getTaskManagerSlots() {
+               return this.slots;
+       }
+
+       @Override
+       public void setQueue(String queue) {
+               this.yarnQueue = queue;
+       }
+
+       @Override
+       public void setLocalJarPath(Path localJarPath) {
+               if(!localJarPath.toString().endsWith("jar")) {
+                       throw new IllegalArgumentException("The passed jar path 
('"+localJarPath+"') does not end with the 'jar' extension");
+               }
+               this.flinkJarPath = localJarPath;
+       }
+
+       @Override
+       public void setConfigurationFilePath(Path confPath) {
+               flinkConfigurationPath = confPath;
+       }
+
+       public void setConfigurationDirectory(String configurationDirectory) {
+               this.configurationDirectory = configurationDirectory;
+       }
+
+       @Override
+       public void setFlinkLoggingConfigurationPath(Path logConfPath) {
+               flinkLoggingConfigurationPath = logConfPath;
+       }
+
+       @Override
+       public Path getFlinkLoggingConfigurationPath() {
+               return flinkLoggingConfigurationPath;
+       }
+
+       @Override
+       public void setTaskManagerCount(int tmCount) {
+               if(tmCount < 1) {
+                       throw new IllegalArgumentException("The TaskManager 
count has to be at least 1.");
+               }
+               this.taskManagerCount = tmCount;
+       }
+
+       @Override
+       public int getTaskManagerCount() {
+               return this.taskManagerCount;
+       }
+
+       @Override
+       public void setShipFiles(List<File> shipFiles) {
+               this.shipFiles.addAll(shipFiles);
+       }
+
+       public void setDynamicPropertiesEncoded(String 
dynamicPropertiesEncoded) {
+               this.dynamicPropertiesEncoded = dynamicPropertiesEncoded;
+       }
+
+       @Override
+       public String getDynamicPropertiesEncoded() {
+               return this.dynamicPropertiesEncoded;
+       }
+
+
+       public void isReadyForDepoyment() throws YarnDeploymentException {
+               if(taskManagerCount <= 0) {
+                       throw new YarnDeploymentException("Taskmanager count 
must be positive");
+               }
+               if(this.flinkJarPath == null) {
+                       throw new YarnDeploymentException("The Flink jar path 
is null");
+               }
+               if(this.configurationDirectory == null) {
+                       throw new YarnDeploymentException("Configuration 
directory not set");
+               }
+               if(this.flinkConfigurationPath == null) {
+                       throw new YarnDeploymentException("Configuration path 
not set");
+               }
+
+       }
+
+       public static boolean allocateResource(int[] nodeManagers, int 
toAllocate) {
+               for(int i = 0; i < nodeManagers.length; i++) {
+                       if(nodeManagers[i] >= toAllocate) {
+                               nodeManagers[i] -= toAllocate;
+                               return true;
+                       }
+               }
+               return false;
+       }
+
+       /**
+        * This method will block until the ApplicationMaster/JobManager have 
been
+        * deployed on YARN.
+        */
+       @Override
+       public AbstractFlinkYarnCluster deploy(String clusterName) throws 
Exception {
+               isReadyForDepoyment();
+
+               LOG.info("Using values:");
+               LOG.info("\tTaskManager count = " + taskManagerCount);
+               LOG.info("\tJobManager memory = " + jobManagerMemoryMb);
+               LOG.info("\tTaskManager memory = " + taskManagerMemoryMb);
+
+               // Create application via yarnClient
+               yarnApplication = yarnClient.createApplication();
+               GetNewApplicationResponse appResponse = 
yarnApplication.getNewApplicationResponse();
+
+               // ------------------ Check if the specified queue exists 
--------------
+
+               List<QueueInfo> queues = yarnClient.getAllQueues();
+               if(queues.size() > 0) { // check only if there are queues 
configured.
+                       boolean queueFound = false;
+                       for (QueueInfo queue : queues) {
+                               if 
(queue.getQueueName().equals(this.yarnQueue)) {
+                                       queueFound = true;
+                                       break;
+                               }
+                       }
+                       if (!queueFound) {
+                               String queueNames = "";
+                               for (QueueInfo queue : queues) {
+                                       queueNames += queue.getQueueName() + ", 
";
+                               }
+                               throw new YarnDeploymentException("The 
specified queue '" + this.yarnQueue + "' does not exist. " +
+                                               "Available queues: " + 
queueNames);
+                       }
+               } else {
+                       LOG.debug("The YARN cluster does not have any queues 
configured");
+               }
+
+               // ------------------ Check if the YARN Cluster has the 
requested resources --------------
+
+               // the yarnMinAllocationMB specifies the smallest possible 
container allocation size.
+               // all allocations below this value are automatically set to 
this value.
+               final int yarnMinAllocationMB = 
conf.getInt("yarn.scheduler.minimum-allocation-mb", 0);
+               if(jobManagerMemoryMb < yarnMinAllocationMB || 
taskManagerMemoryMb < yarnMinAllocationMB) {
+                       LOG.warn("The JobManager or TaskManager memory is below 
the smallest possible YARN Container size. "
+                                       + "The value of 
'yarn.scheduler.minimum-allocation-mb' is '"+yarnMinAllocationMB+"'. Please 
increase the memory size." +
+                                       "YARN will allocate the smaller 
containers but the scheduler will account for the minimum-allocation-mb, maybe 
not all instances " +
+                                       "you requested will start.");
+               }
+
+               // set the memory to minAllocationMB to do the next checks 
correctly
+               if(jobManagerMemoryMb < yarnMinAllocationMB) {
+                       jobManagerMemoryMb =  yarnMinAllocationMB;
+               }
+               if(taskManagerMemoryMb < yarnMinAllocationMB) {
+                       taskManagerMemoryMb =  yarnMinAllocationMB;
+               }
+
+               Resource maxRes = appResponse.getMaximumResourceCapability();
+               final String NOTE = "Please check the 
'yarn.scheduler.maximum-allocation-mb' and the 
'yarn.nodemanager.resource.memory-mb' configuration values\n";
+               if(jobManagerMemoryMb > maxRes.getMemory() ) {
+                       failSessionDuringDeployment();
+                       throw new YarnDeploymentException("The cluster does not 
have the requested resources for the JobManager available!\n"
+                                       + "Maximum Memory: "+maxRes.getMemory() 
+ "MB Requested: "+jobManagerMemoryMb+"MB. " + NOTE);
+               }
+
+               if(taskManagerMemoryMb > maxRes.getMemory() ) {
+                       failSessionDuringDeployment();
+                       throw new YarnDeploymentException("The cluster does not 
have the requested resources for the TaskManagers available!\n"
+                                       + "Maximum Memory: " + 
maxRes.getMemory() + " Requested: "+taskManagerMemoryMb + "MB. " + NOTE);
+               }
+
+
+               int totalMemoryRequired = jobManagerMemoryMb + 
taskManagerMemoryMb * taskManagerCount;
+               ClusterResourceDescription freeClusterMem = 
getCurrentFreeClusterResources(yarnClient);
+               if(freeClusterMem.totalFreeMemory < totalMemoryRequired) {
+                       failSessionDuringDeployment();
+                       throw new YarnDeploymentException("This YARN session 
requires " + totalMemoryRequired + "MB of memory in the cluster. "
+                                       + "There are currently only " + 
freeClusterMem.totalFreeMemory+"MB available.");
+
+               }
+               if( taskManagerMemoryMb > freeClusterMem.containerLimit) {
+                       failSessionDuringDeployment();
+                       throw new YarnDeploymentException("The requested amount 
of memory for the TaskManagers ("+taskManagerMemoryMb+"MB) is more than "
+                                       + "the largest possible YARN container: 
"+freeClusterMem.containerLimit);
+               }
+               if( jobManagerMemoryMb > freeClusterMem.containerLimit) {
+                       failSessionDuringDeployment();
+                       throw new YarnDeploymentException("The requested amount 
of memory for the JobManager ("+jobManagerMemoryMb+"MB) is more than "
+                                       + "the largest possible YARN container: 
"+freeClusterMem.containerLimit);
+               }
+
+               // ----------------- check if the requested containers fit into 
the cluster.
+
+               int[] nmFree = Arrays.copyOf(freeClusterMem.nodeManagersFree, 
freeClusterMem.nodeManagersFree.length);
+               // first, allocate the jobManager somewhere.
+               if(!allocateResource(nmFree, jobManagerMemoryMb)) {
+                       failSessionDuringDeployment();
+                       throw new YarnDeploymentException("Unable to find a 
NodeManager that can fit the JobManager/Application master. " +
+                                       "The JobManager requires " + 
jobManagerMemoryMb + "MB. NodeManagers available: 
"+Arrays.toString(freeClusterMem.nodeManagersFree));
+               }
+               // allocate TaskManagers
+               for(int i = 0; i < taskManagerCount; i++) {
+                       if(!allocateResource(nmFree, taskManagerMemoryMb)) {
+                               failSessionDuringDeployment();
+                               throw new YarnDeploymentException("There is not 
enough memory available in the YARN cluster. " +
+                                               "The TaskManager(s) require " + 
taskManagerMemoryMb + "MB each. " +
+                                               "NodeManagers available: 
"+Arrays.toString(freeClusterMem.nodeManagersFree) + "\n" +
+                                               "After allocating the 
JobManager (" + jobManagerMemoryMb + "MB) and (" + i + "/" + taskManagerCount + 
") TaskManagers, " +
+                                               "the following NodeManagers are 
available: " + Arrays.toString(nmFree) );
+                       }
+               }
+
+               // ------------------ Prepare Application Master Container  
------------------------------
+
+               // respect custom JVM options in the YAML file
+               final String javaOpts = 
GlobalConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
+
+               String logbackFile = configurationDirectory + File.separator + 
FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME;
+               boolean hasLogback = new File(logbackFile).exists();
+               String log4jFile = configurationDirectory + File.separator + 
FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME;
+
+               boolean hasLog4j = new File(log4jFile).exists();
+               if(hasLogback) {
+                       shipFiles.add(new File(logbackFile));
+               }
+               if(hasLog4j) {
+                       shipFiles.add(new File(log4jFile));
+               }
+
+               // Set up the container launch context for the application 
master
+               ContainerLaunchContext amContainer = Records
+                               .newRecord(ContainerLaunchContext.class);
+
+               String amCommand = "$JAVA_HOME/bin/java"
+                                       + " 
-Xmx"+Utils.calculateHeapSize(jobManagerMemoryMb)+"M " +javaOpts;
+
+               if(hasLogback || hasLog4j) {
+                       amCommand += " 
-Dlog.file=\""+ApplicationConstants.LOG_DIR_EXPANSION_VAR 
+"/jobmanager-main.log\"";
+               }
+
+               if(hasLogback) {
+                       amCommand += " -Dlogback.configurationFile=file:" + 
FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME;
+               }
+               if(hasLog4j) {
+                       amCommand += " -Dlog4j.configuration=file:" + 
FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME;
+               }
+
+               amCommand       += " "+ApplicationMaster.class.getName()+" "
+                                       + " 1>"
+                                       + 
ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-stdout.log"
+                                       + " 2>" + 
ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-stderr.log";
+               amContainer.setCommands(Collections.singletonList(amCommand));
+
+               LOG.debug("Application Master start command: "+amCommand);
+
+               // intialize HDFS
+               // Copy the application master jar to the filesystem
+               // Create a local resource to point to the destination jar path
+               final FileSystem fs = FileSystem.get(conf);
+
+               // hard coded check for the GoogleHDFS client because its not 
overriding the getScheme() method.
+               if( 
!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") &&
+                               fs.getScheme().startsWith("file")) {
+                       LOG.warn("The file system scheme is '" + fs.getScheme() 
+ "'. This indicates that the "
+                                       + "specified Hadoop configuration path 
is wrong and the sytem is using the default Hadoop configuration values."
+                                       + "The Flink YARN client needs to store 
its files in a distributed file system");
+               }
+
+               // Set-up ApplicationSubmissionContext for the application
+               ApplicationSubmissionContext appContext = 
yarnApplication.getApplicationSubmissionContext();
+               final ApplicationId appId = appContext.getApplicationId();
+
+               // Setup jar for ApplicationMaster
+               LocalResource appMasterJar = 
Records.newRecord(LocalResource.class);
+               LocalResource flinkConf = 
Records.newRecord(LocalResource.class);
+               Path remotePathJar = Utils.setupLocalResource(conf, fs, 
appId.toString(), flinkJarPath, appMasterJar, fs.getHomeDirectory());
+               Path remotePathConf = Utils.setupLocalResource(conf, fs, 
appId.toString(), flinkConfigurationPath, flinkConf, fs.getHomeDirectory());
+               Map<String, LocalResource> localResources = new HashMap<String, 
LocalResource>(2);
+               localResources.put("flink.jar", appMasterJar);
+               localResources.put("flink-conf.yaml", flinkConf);
+
+
+               // setup security tokens (code from apache storm)
+               final Path[] paths = new Path[3 + shipFiles.size()];
+               StringBuffer envShipFileList = new StringBuffer();
+               // upload ship files
+               for (int i = 0; i < shipFiles.size(); i++) {
+                       File shipFile = shipFiles.get(i);
+                       LocalResource shipResources = 
Records.newRecord(LocalResource.class);
+                       Path shipLocalPath = new Path("file://" + 
shipFile.getAbsolutePath());
+                       paths[3 + i] = Utils.setupLocalResource(conf, fs, 
appId.toString(),
+                                       shipLocalPath, shipResources, 
fs.getHomeDirectory());
+                       localResources.put(shipFile.getName(), shipResources);
+
+                       envShipFileList.append(paths[3 + i]);
+                       if(i+1 < shipFiles.size()) {
+                               envShipFileList.append(',');
+                       }
+               }
+
+               paths[0] = remotePathJar;
+               paths[1] = remotePathConf;
+               sessionFilesDir = new Path(fs.getHomeDirectory(), ".flink/" + 
appId.toString() + "/");
+               FsPermission permission = new FsPermission(FsAction.ALL, 
FsAction.ALL, FsAction.ALL);
+               fs.setPermission(sessionFilesDir, permission); // set 
permission for path.
+               Utils.setTokensFor(amContainer, paths, this.conf);
+
+
+               amContainer.setLocalResources(localResources);
+               fs.close();
+
+               // Setup CLASSPATH for ApplicationMaster
+               Map<String, String> appMasterEnv = new HashMap<String, 
String>();
+               Utils.setupEnv(conf, appMasterEnv);
+               // set configuration values
+               appMasterEnv.put(FlinkYarnClient.ENV_TM_COUNT, 
String.valueOf(taskManagerCount));
+               appMasterEnv.put(FlinkYarnClient.ENV_TM_MEMORY, 
String.valueOf(taskManagerMemoryMb));
+               appMasterEnv.put(FlinkYarnClient.FLINK_JAR_PATH, 
remotePathJar.toString() );
+               appMasterEnv.put(FlinkYarnClient.ENV_APP_ID, appId.toString());
+               appMasterEnv.put(FlinkYarnClient.ENV_CLIENT_HOME_DIR, 
fs.getHomeDirectory().toString());
+               appMasterEnv.put(FlinkYarnClient.ENV_CLIENT_SHIP_FILES, 
envShipFileList.toString() );
+               appMasterEnv.put(FlinkYarnClient.ENV_CLIENT_USERNAME, 
UserGroupInformation.getCurrentUser().getShortUserName());
+               appMasterEnv.put(FlinkYarnClient.ENV_SLOTS, 
String.valueOf(slots));
+               if(dynamicPropertiesEncoded != null) {
+                       
appMasterEnv.put(FlinkYarnClient.ENV_DYNAMIC_PROPERTIES, 
dynamicPropertiesEncoded);
+               }
+
+               amContainer.setEnvironment(appMasterEnv);
+
+               // Set up resource type requirements for ApplicationMaster
+               Resource capability = Records.newRecord(Resource.class);
+               capability.setMemory(jobManagerMemoryMb);
+               capability.setVirtualCores(1);
+
+               if(clusterName == null) {
+                       clusterName = "Flink session with "+taskManagerCount+" 
TaskManagers";
+               }
+
+               appContext.setApplicationName(clusterName); // application name
+               appContext.setApplicationType("Apache Flink");
+               appContext.setAMContainerSpec(amContainer);
+               appContext.setResource(capability);
+               appContext.setQueue(yarnQueue);
+
+
+               LOG.info("Submitting application master " + appId);
+               yarnClient.submitApplication(appContext);
+
+               LOG.info("Waiting for the cluster to be allocated");
+               int waittime = 0;
+               loop: while( true ) {
+                       ApplicationReport report = 
yarnClient.getApplicationReport(appId);
+                       YarnApplicationState appState = 
report.getYarnApplicationState();
+                       switch(appState) {
+                               case FAILED:
+                               case FINISHED:
+                               case KILLED:
+                                       throw new YarnDeploymentException("The 
YARN application unexpectedly switched to state "
+                                                       + appState +" during 
deployment. \n" +
+                                                       "Diagnostics from YARN: 
"+report.getDiagnostics() + "\n" +
+                                                       "If log aggregation is 
enabled on your cluster, use this command to further invesitage the issue:\n" +
+                                                       "yarn logs 
-applicationId "+appId);
+                                       //break ..
+                               case RUNNING:
+                                       LOG.info("YARN application has been 
deployed successfully.");
+                                       break loop;
+                               default:
+                                       LOG.info("Deploying cluster, current 
state "+appState);
+                                       if(waittime > 60000) {
+                                               LOG.info("Deployment took more 
than 60 seconds. Please check if the requested resources are available in the 
YARN cluster");
+                                       }
+
+                       }
+                       waittime += 1000;
+                       Thread.sleep(1000);
+               }
+               // the Flink cluster is deployed in YARN. Represent cluster
+               return new FlinkYarnCluster(yarnClient, appId, conf, 
sessionFilesDir);
+       }
+
+       /**
+        * Kills YARN application and stops YARN client.
+        *
+        * Use this method to kill the App before it has been properly deployed
+        */
+       private void failSessionDuringDeployment() {
+               LOG.info("Killing YARN application");
+
+               try {
+                       
yarnClient.killApplication(yarnApplication.getNewApplicationResponse().getApplicationId());
+               } catch (Exception e) {
+                       // we only log a debug message here because the 
"killApplication" call is a best-effort
+                       // call (we don't know if the application has been 
deployed when the error occured).
+                       LOG.debug("Error while killing YARN application", e);
+               }
+               yarnClient.stop();
+       }
+
+
+       private static class ClusterResourceDescription {
+               final public int totalFreeMemory;
+               final public int containerLimit;
+               final public int[] nodeManagersFree;
+
+               public ClusterResourceDescription(int totalFreeMemory, int 
containerLimit, int[] nodeManagersFree) {
+                       this.totalFreeMemory = totalFreeMemory;
+                       this.containerLimit = containerLimit;
+                       this.nodeManagersFree = nodeManagersFree;
+               }
+       }
+
+       private ClusterResourceDescription 
getCurrentFreeClusterResources(YarnClient yarnClient) throws YarnException, 
IOException {
+               List<NodeReport> nodes = 
yarnClient.getNodeReports(NodeState.RUNNING);
+
+               int totalFreeMemory = 0;
+               int containerLimit = 0;
+               int[] nodeManagersFree = new int[nodes.size()];
+
+               for(int i = 0; i < nodes.size(); i++) {
+                       NodeReport rep = nodes.get(i);
+                       int free = rep.getCapability().getMemory() - 
(rep.getUsed() != null ? rep.getUsed().getMemory() : 0 );
+                       nodeManagersFree[i] = free;
+                       totalFreeMemory += free;
+                       if(free > containerLimit) {
+                               containerLimit = free;
+                       }
+               }
+               return new ClusterResourceDescription(totalFreeMemory, 
containerLimit, nodeManagersFree);
+       }
+
+
+
+       public String getClusterDescription() throws Exception {
+
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               PrintStream ps = new PrintStream(baos);
+
+               YarnClusterMetrics metrics = yarnClient.getYarnClusterMetrics();
+
+               ps.append("NodeManagers in the Cluster " + 
metrics.getNumNodeManagers());
+               List<NodeReport> nodes = 
yarnClient.getNodeReports(NodeState.RUNNING);
+               final String format = "|%-16s |%-16s %n";
+               ps.printf("|Property         |Value          %n");
+               ps.println("+---------------------------------------+");
+               int totalMemory = 0;
+               int totalCores = 0;
+               for(NodeReport rep : nodes) {
+                       final Resource res = rep.getCapability();
+                       totalMemory += res.getMemory();
+                       totalCores += res.getVirtualCores();
+                       ps.format(format, "NodeID", rep.getNodeId());
+                       ps.format(format, "Memory", res.getMemory()+" MB");
+                       ps.format(format, "vCores", res.getVirtualCores());
+                       ps.format(format, "HealthReport", 
rep.getHealthReport());
+                       ps.format(format, "Containers", rep.getNumContainers());
+                       ps.println("+---------------------------------------+");
+               }
+               ps.println("Summary: totalMemory "+totalMemory+" totalCores 
"+totalCores);
+               List<QueueInfo> qInfo = yarnClient.getAllQueues();
+               for(QueueInfo q : qInfo) {
+                       ps.println("Queue: "+q.getQueueName()+", Current 
Capacity: "+q.getCurrentCapacity()+" Max Capacity: "+q.getMaximumCapacity()+" 
Applications: "+q.getApplications().size());
+               }
+               yarnClient.stop();
+               return baos.toString();
+       }
+
+       public static class YarnDeploymentException extends RuntimeException {
+               public YarnDeploymentException() {
+               }
+
+               public YarnDeploymentException(String message) {
+                       super(message);
+               }
+
+               public YarnDeploymentException(String message, Throwable cause) 
{
+                       super(message, cause);
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
new file mode 100644
index 0000000..98abd5e
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+
+import static akka.pattern.Patterns.ask;
+
+import akka.actor.Props;
+import akka.util.Timeout;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
+import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.None$;
+import scala.Some;
+import scala.concurrent.Await;
+import scala.concurrent.Awaitable;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
+       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkYarnCluster.class);
+
+       private static final int POLLING_THREAD_INTERVAL_MS = 1000;
+
+       private YarnClient yarnClient;
+       private Thread actorRunner;
+       private Thread clientShutdownHook = new ClientShutdownHook();
+       private PollingThread pollingRunner;
+       private Configuration hadoopConfig;
+       // (HDFS) location of the files required to run on YARN. Needed here to 
delete them on shutdown.
+       private Path sessionFilesDir;
+       private InetSocketAddress jobManagerAddress;
+
+       //---------- Class internal fields -------------------
+
+       private ActorSystem actorSystem;
+       private ActorRef applicationClient;
+       private ApplicationReport intialAppReport;
+       private static FiniteDuration akkaDuration = Duration.apply(5, 
TimeUnit.SECONDS);
+       private static Timeout akkaTimeout = 
Timeout.durationToTimeout(akkaDuration);
+
+       public FlinkYarnCluster(final YarnClient yarnClient, final 
ApplicationId appId,
+                                                       Configuration 
hadoopConfig, Path sessionFilesDir) throws IOException, YarnException {
+               this.yarnClient = yarnClient;
+               this.hadoopConfig = hadoopConfig;
+               this.sessionFilesDir = sessionFilesDir;
+
+               // get one application report manually
+               intialAppReport = yarnClient.getApplicationReport(appId);
+               String jobManagerHost = intialAppReport.getHost();
+               int jobManagerPort = intialAppReport.getRpcPort();
+               this.jobManagerAddress = new InetSocketAddress(jobManagerHost, 
jobManagerPort);
+
+               // start actor system
+               LOG.info("Start actor system.");
+               InetAddress ownHostname = 
NetUtils.resolveAddress(jobManagerAddress); // find name of own public 
interface, able to connect to the JM
+               actorSystem = 
YarnUtils.createActorSystem(ownHostname.getCanonicalHostName(), 0, 
GlobalConfiguration.getConfiguration()); // set port automatically.
+
+               // start application client
+               LOG.info("Start application client.");
+
+               applicationClient = 
actorSystem.actorOf(Props.create(ApplicationClient.class));
+
+               // instruct ApplicationClient to start a periodical status 
polling
+               applicationClient.tell(new 
Messages.LocalRegisterClient(jobManagerHost + ":" + jobManagerPort), 
applicationClient);
+
+
+               // add hook to ensure proper shutdown
+               Runtime.getRuntime().addShutdownHook(clientShutdownHook);
+
+               actorRunner = new Thread(new Runnable() {
+                       @Override
+                       public void run() {
+                               // blocks until ApplicationMaster has been 
stopped
+                               actorSystem.awaitTermination();
+
+                               // get final application report
+                               try {
+                                       ApplicationReport appReport = 
yarnClient.getApplicationReport(appId);
+
+                                       LOG.info("Application " + appId + " 
finished with state " + appReport
+                                                       
.getYarnApplicationState() + " and final state " + appReport
+                                                       
.getFinalApplicationStatus() + " at " + appReport.getFinishTime());
+
+                                       if(appReport.getYarnApplicationState() 
== YarnApplicationState.FAILED || appReport.getYarnApplicationState()
+                                                       == 
YarnApplicationState.KILLED  ) {
+                                               LOG.warn("Application failed. 
Diagnostics "+appReport.getDiagnostics());
+                                               LOG.warn("If log aggregation is 
activated in the Hadoop cluster, we recommend to retrieve "
+                                                               + "the full 
application log using this command:\n"
+                                                               + "\tyarn logs 
-applicationId "+appReport.getApplicationId()+"\n"
+                                                               + "(It 
sometimes takes a few seconds until the logs are aggregated)");
+                                       }
+                               } catch(Exception e) {
+                                       LOG.warn("Error while getting final 
application report", e);
+                               }
+                       }
+               });
+               actorRunner.setDaemon(true);
+               actorRunner.start();
+
+               pollingRunner = new PollingThread(yarnClient, appId);
+               pollingRunner.setDaemon(true);
+               pollingRunner.start();
+       }
+
+       // -------------------------- Interaction with the cluster 
------------------------
+
+       @Override
+       public InetSocketAddress getJobManagerAddress() {
+               return jobManagerAddress;
+       }
+
+       @Override
+       public String getWebInterfaceURL() {
+               return this.intialAppReport.getTrackingUrl();
+       }
+
+
+       @Override
+       public FlinkYarnClusterStatus getClusterStatus() {
+               if(hasBeenStopped()) {
+                       throw new RuntimeException("The FlinkYarnCluster has 
alread been stopped");
+               }
+               Future<Object> clusterStatusOption = ask(applicationClient, 
Messages.LocalGetYarnClusterStatus$.MODULE$, akkaTimeout);
+               Object clusterStatus = awaitUtil(clusterStatusOption, "Unable 
to get Cluster status from Application Client");
+               if(clusterStatus instanceof None$) {
+                       return null;
+               } else if(clusterStatus instanceof Some) {
+                       return (FlinkYarnClusterStatus) (((Some) 
clusterStatus).get());
+               } else {
+                       throw new RuntimeException("Unexpected type: 
"+clusterStatus.getClass().getCanonicalName());
+               }
+       }
+
+       @Override
+       public boolean hasFailed() {
+               if(pollingRunner == null) {
+                       LOG.warn("FlinkYarnCluster.hasFailed() has been called 
on an uninitialized cluster." +
+                                       "The system might be in an erroneous 
state");
+               }
+               ApplicationReport lastReport = pollingRunner.getLastReport();
+               if(lastReport == null) {
+                       LOG.warn("FlinkYarnCluster.hasFailed() has been called 
on a cluster. that didn't receive a status so far." +
+                                       "The system might be in an erroneous 
state");
+                       return false;
+               } else {
+                       return (lastReport.getYarnApplicationState() == 
YarnApplicationState.FAILED ||
+                                       lastReport.getYarnApplicationState() == 
YarnApplicationState.KILLED);
+               }
+       }
+
+       @Override
+       public String getDiagnostics() {
+               if (!hasFailed()) {
+                       LOG.warn("getDiagnostics() called for cluster which is 
not in failed state");
+               }
+               ApplicationReport lastReport = pollingRunner.getLastReport();
+               if (lastReport == null) {
+                       LOG.warn("Last report is null");
+                       return null;
+               } else {
+                       return lastReport.getDiagnostics();
+               }
+       }
+
+       @Override
+       public List<String> getNewMessages() {
+               if(hasBeenStopped()) {
+                       throw new RuntimeException("The FlinkYarnCluster has 
alread been stopped");
+               }
+               List<String> ret = new ArrayList<String>();
+               // get messages from ApplicationClient (locally)
+               while(true) {
+                       Future<Object> messageOptionFuture = 
ask(applicationClient, Messages.LocalGetYarnMessage$.MODULE$, akkaTimeout);
+                       Object messageOption = awaitUtil(messageOptionFuture, 
"Error getting new messages from Appliation Client");
+                       if(messageOption instanceof None$) {
+                               break;
+                       } else if(messageOption instanceof 
org.apache.flink.yarn.Messages.YarnMessage) {
+                               Messages.YarnMessage msg = 
(Messages.YarnMessage) messageOption;
+                               ret.add("["+msg.date()+"] "+msg.message());
+                       } else {
+                               LOG.warn("LocalGetYarnMessage returned 
unexpected type: "+messageOption);
+                       }
+               }
+               return ret;
+       }
+
+       private static <T> T awaitUtil(Awaitable<T> awaitable, String message) {
+               try {
+                       return Await.result(awaitable, akkaDuration);
+               } catch (Exception e) {
+                       throw new RuntimeException(message, e);
+               }
+       }
+
+       // -------------------------- Shutdown handling ------------------------
+
+       private AtomicBoolean hasBeenShutDown = new AtomicBoolean(false);
+       @Override
+       public void shutdown() {
+               shutdownInternal(true);
+       }
+
+       private void shutdownInternal(boolean removeShutdownHook) {
+               if(hasBeenShutDown.getAndSet(true)) {
+                       return;
+               }
+               // the session is being stopped explicitly.
+               if(removeShutdownHook) {
+                       
Runtime.getRuntime().removeShutdownHook(clientShutdownHook);
+               }
+               if(actorSystem != null){
+                       LOG.info("Sending shutdown request to the Application 
Master");
+                       if(applicationClient != ActorRef.noSender()) {
+                               Future<Object> future = ask(applicationClient, 
new Messages.StopYarnSession(FinalApplicationStatus.SUCCEEDED), akkaTimeout);
+                               awaitUtil(future, "Error while stopping YARN 
Application Client");
+                       }
+
+                       actorSystem.shutdown();
+                       actorSystem.awaitTermination();
+
+                       actorSystem = null;
+               }
+
+               LOG.info("Deleting files in "+sessionFilesDir );
+               try {
+                       FileSystem shutFS = FileSystem.get(hadoopConfig);
+                       shutFS.delete(sessionFilesDir, true); // delete conf 
and jar file.
+                       shutFS.close();
+               }catch(IOException e){
+                       LOG.error("Could not delete the Flink jar and 
configuration files in HDFS..", e);
+               }
+
+               try {
+                       actorRunner.join(1000); // wait for 1 second
+               } catch (InterruptedException e) {
+                       LOG.warn("Shutdown of the actor runner was 
interrupted", e);
+                       Thread.currentThread().interrupt();
+               }
+               try {
+                       pollingRunner.stopRunner();
+                       pollingRunner.join(1000);
+               } catch(InterruptedException e) {
+                       LOG.warn("Shutdown of the polling runner was 
interrupted", e);
+                       Thread.currentThread().interrupt();
+               }
+
+               LOG.info("YARN Client is shutting down");
+               yarnClient.stop(); // actorRunner is using the yarnClient.
+               yarnClient = null; // set null to clearly see if somebody wants 
to access it afterwards.
+       }
+
+       @Override
+       public boolean hasBeenStopped() {
+               return hasBeenShutDown.get();
+       }
+
+
+       public class ClientShutdownHook extends Thread {
+               @Override
+               public void run() {
+                       shutdownInternal(false);
+               }
+       }
+
+       // -------------------------- Polling ------------------------
+
+       public static class PollingThread extends Thread {
+
+               AtomicBoolean running = new AtomicBoolean(true);
+               private YarnClient yarnClient;
+               private ApplicationId appId;
+
+               // ------- status information stored in the polling thread
+               private Object lock = new Object();
+               private ApplicationReport lastReport;
+
+
+               public PollingThread(YarnClient yarnClient, ApplicationId 
appId) {
+                       this.yarnClient = yarnClient;
+                       this.appId = appId;
+               }
+
+               public void stopRunner() {
+                       if(!running.get()) {
+                               LOG.warn("Polling thread was already stopped");
+                       }
+                       running.set(false);
+               }
+
+               public ApplicationReport getLastReport() {
+                       synchronized (lock) {
+                               return lastReport;
+                       }
+               }
+
+               @Override
+               public void run() {
+                       while (running.get() && 
yarnClient.isInState(Service.STATE.STARTED)) {
+                               try {
+                                       ApplicationReport report = 
yarnClient.getApplicationReport(appId);
+                                       synchronized (lock) {
+                                               lastReport = report;
+                                       }
+                               } catch (Exception e) {
+                                       LOG.warn("Error while getting 
application report", e);
+                                       // TODO: do more here.
+                               }
+                               try {
+                                       
Thread.sleep(FlinkYarnCluster.POLLING_THREAD_INTERVAL_MS);
+                               } catch (InterruptedException e) {
+                                       LOG.error("Polling thread got 
interrupted", e);
+                                       Thread.currentThread().interrupt(); // 
pass interrupt.
+                               }
+                       }
+                       if(running.get() && 
!yarnClient.isInState(Service.STATE.STARTED)) {
+                               // == if the polling thread is still running 
but the yarn client is stopped.
+                               LOG.warn("YARN client is unexpected in state 
"+yarnClient.getServiceState());
+                       }
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
new file mode 100644
index 0000000..8bb2668
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.StringInterner;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+public class Utils {
+       
+       private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+
+       private static final int DEFAULT_HEAP_LIMIT_CAP = 500;
+       private static final float DEFAULT_YARN_HEAP_CUTOFF_RATIO = 0.8f;
+
+       /**
+        * Calculate the heap size for the JVMs to start in the containers.
+        * Since JVMs are allocating more than just the heap space, and YARN is 
very
+        * fast at killing processes that use memory beyond their limit, we 
have to come
+        * up with a good heapsize.
+        * This code takes 85% of the given amount of memory (in MB). If the 
amount we removed by these 85%
+        * more than 500MB (the current HEAP_LIMIT_CAP), we'll just subtract 
500 MB.
+        * 
+        */
+       public static int calculateHeapSize(int memory) {
+               float memoryCutoffRatio = 
GlobalConfiguration.getFloat(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, 
DEFAULT_YARN_HEAP_CUTOFF_RATIO);
+               int heapLimitCap = 
GlobalConfiguration.getInteger(ConfigConstants.YARN_HEAP_LIMIT_CAP, 
DEFAULT_HEAP_LIMIT_CAP);
+
+               int heapLimit = (int)((float)memory * memoryCutoffRatio);
+               if( (memory - heapLimit) > heapLimitCap) {
+                       heapLimit = memory-heapLimitCap;
+               }
+               return heapLimit;
+       }
+       
+       private static void addPathToConfig(Configuration conf, File path) {
+               // chain-in a new classloader
+               URL fileUrl = null;
+               try {
+                       fileUrl = path.toURI().toURL();
+               } catch (MalformedURLException e) {
+                       throw new RuntimeException("Erroneous config file 
path", e);
+               }
+               URL[] urls = {fileUrl};
+               ClassLoader cl = new URLClassLoader(urls, 
conf.getClassLoader());
+               conf.setClassLoader(cl);
+       }
+       
+       private static void setDefaultConfValues(Configuration conf) {
+               if(conf.get("fs.hdfs.impl",null) == null) {
+                       conf.set("fs.hdfs.impl", 
"org.apache.hadoop.hdfs.DistributedFileSystem");
+               }
+               if(conf.get("fs.file.impl",null) == null) {
+                       conf.set("fs.file.impl", 
"org.apache.hadoop.fs.LocalFileSystem");
+               }
+       }
+       
+       public static Configuration initializeYarnConfiguration() {
+               Configuration conf = new YarnConfiguration();
+               String configuredHadoopConfig = 
GlobalConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
+               if(configuredHadoopConfig != null) {
+                       LOG.info("Using hadoop configuration path from " + 
ConfigConstants.PATH_HADOOP_CONFIG + " setting.");
+                       addPathToConfig(conf, new File(configuredHadoopConfig));
+                       setDefaultConfValues(conf);
+                       return conf;
+               }
+               String[] envs = { "YARN_CONF_DIR", "HADOOP_CONF_DIR", 
"HADOOP_CONF_PATH" };
+               for(int i = 0; i < envs.length; ++i) {
+                       String confPath = System.getenv(envs[i]);
+                       if (confPath != null) {
+                               LOG.info("Found "+envs[i]+", adding it to 
configuration");
+                               addPathToConfig(conf, new File(confPath));
+                               setDefaultConfValues(conf);
+                               return conf;
+                       }
+               }
+               LOG.info("Could not find HADOOP_CONF_PATH, using HADOOP_HOME.");
+               String hadoopHome = null;
+               try {
+                       hadoopHome = Shell.getHadoopHome();
+               } catch (IOException e) {
+                       throw new RuntimeException("Unable to get hadoop home. 
Please set HADOOP_HOME variable!", e);
+               }
+               File tryConf = new File(hadoopHome+"/etc/hadoop");
+               if(tryConf.exists()) {
+                       LOG.info("Found configuration using hadoop home.");
+                       addPathToConfig(conf, tryConf);
+               } else {
+                       tryConf = new File(hadoopHome+"/conf");
+                       if(tryConf.exists()) {
+                               addPathToConfig(conf, tryConf);
+                       }
+               }
+               setDefaultConfValues(conf);
+               return conf;
+       }
+       
+       public static void setupEnv(Configuration conf, Map<String, String> 
appMasterEnv) {
+               addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), 
Environment.PWD.$() + File.separator + "*");
+               for (String c : 
conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH))
 {
+                       addToEnvironment(appMasterEnv, 
Environment.CLASSPATH.name(), c.trim());
+               }
+       }
+       
+       
+       /**
+        * 
+        * @return Path to remote file (usually hdfs)
+        * @throws IOException
+        */
+       public static Path setupLocalResource(Configuration conf, FileSystem 
fs, String appId, Path localRsrcPath, LocalResource appMasterJar, Path homedir)
+                       throws IOException {
+               // copy to HDFS
+               String suffix = ".flink/" + appId + "/" + 
localRsrcPath.getName();
+               
+               Path dst = new Path(homedir, suffix);
+               
+               LOG.info("Copying from "+localRsrcPath+" to "+dst );
+               fs.copyFromLocalFile(localRsrcPath, dst);
+               registerLocalResource(fs, dst, appMasterJar);
+               return dst;
+       }
+       
+       public static void registerLocalResource(FileSystem fs, Path 
remoteRsrcPath, LocalResource localResource) throws IOException {
+               FileStatus jarStat = fs.getFileStatus(remoteRsrcPath);
+               
localResource.setResource(ConverterUtils.getYarnUrlFromURI(remoteRsrcPath.toUri()));
+               localResource.setSize(jarStat.getLen());
+               localResource.setTimestamp(jarStat.getModificationTime());
+               localResource.setType(LocalResourceType.FILE);
+               
localResource.setVisibility(LocalResourceVisibility.APPLICATION);
+       }
+
+       public static void setTokensFor(ContainerLaunchContext amContainer, 
Path[] paths, Configuration conf) throws IOException {
+               Credentials credentials = new Credentials();
+               // for HDFS
+               TokenCache.obtainTokensForNamenodes(credentials, paths, conf);
+               // for user
+               UserGroupInformation currUsr = 
UserGroupInformation.getCurrentUser();
+               
+               Collection<Token<? extends TokenIdentifier>> usrTok = 
currUsr.getTokens();
+               for(Token<? extends TokenIdentifier> token : usrTok) {
+                       final Text id = new Text(token.getIdentifier());
+                       LOG.info("Adding user token "+id+" with "+token);
+                       credentials.addToken(id, token);
+               }
+               DataOutputBuffer dob = new DataOutputBuffer();
+               credentials.writeTokenStorageToStream(dob);
+
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("Wrote tokens. Credentials buffer length: " + 
dob.getLength());
+               }
+
+               ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, 
dob.getLength());
+               amContainer.setTokens(securityTokens);
+       }
+       
+       public static void logFilesInCurrentDirectory(final Logger logger) {
+               new File(".").list(new FilenameFilter() {
+                       
+                       @Override
+                       public boolean accept(File dir, String name) {
+                               logger.info(dir.getAbsolutePath()+"/"+name);
+                               return true;
+                       }
+               });
+       }
+       
+       /**
+        * Copied method from org.apache.hadoop.yarn.util.Apps
+        * It was broken by YARN-1824 (2.4.0) and fixed for 2.4.1
+        * by https://issues.apache.org/jira/browse/YARN-1931
+        */
+       public static void addToEnvironment(Map<String, String> environment,
+                       String variable, String value) {
+               String val = environment.get(variable);
+               if (val == null) {
+                       val = value;
+               } else {
+                       val = val + File.pathSeparator + value;
+               }
+               environment.put(StringInterner.weakIntern(variable),
+                               StringInterner.weakIntern(val));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
new file mode 100644
index 0000000..3f1cc23
--- /dev/null
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.appMaster;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.Arrays;
+import java.util.Map;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import org.apache.flink.yarn.YarnUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.yarn.FlinkYarnClient;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import scala.Tuple2;
+
+
+public class YarnTaskManagerRunner {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(YarnTaskManagerRunner.class);
+
+       public static void main(final String[] args) throws IOException {
+               Map<String, String> envs = System.getenv();
+               final String yarnClientUsername = 
envs.get(FlinkYarnClient.ENV_CLIENT_USERNAME);
+               final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
+
+               // configure local directory
+               final String[] newArgs = Arrays.copyOf(args, args.length + 2);
+               newArgs[newArgs.length-2] = "--tempDir";
+               newArgs[newArgs.length-1] = localDirs;
+               LOG.info("Setting log path "+localDirs);
+               LOG.info("YARN daemon runs as 
'"+UserGroupInformation.getCurrentUser().getShortUserName()+"' setting"
+                               + " user to execute Flink TaskManager to 
'"+yarnClientUsername+"'");
+               UserGroupInformation ugi = 
UserGroupInformation.createRemoteUser(yarnClientUsername);
+               for(Token<? extends TokenIdentifier> toks : 
UserGroupInformation.getCurrentUser().getTokens()) {
+                       ugi.addToken(toks);
+               }
+               ugi.doAs(new PrivilegedAction<Object>() {
+                       @Override
+                       public Object run() {
+                               try {
+                                       Tuple2<ActorSystem, ActorRef> tuple = 
YarnUtils
+                                                       
.startActorSystemAndTaskManager(newArgs);
+
+                                       tuple._1().awaitTermination();
+                               } catch (Exception e) {
+                                       LOG.error("Error while running the 
TaskManager", e);
+                               }
+                               return null;
+                       }
+               });
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
new file mode 100644
index 0000000..22f4c02
--- /dev/null
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn
+
+import java.util.concurrent.TimeUnit
+
+import akka.actor._
+import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration}
+import org.apache.flink.runtime.ActorLogMessages
+import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.jobmanager.JobManager
+import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus
+import org.apache.flink.yarn.Messages._
+import scala.collection.mutable
+import scala.concurrent.duration._
+
+class ApplicationClient
+
+  extends Actor with ActorLogMessages with ActorLogging {
+  import context._
+
+  val INITIAL_POLLING_DELAY = 0 seconds
+  val WAIT_FOR_YARN_INTERVAL = 2 seconds
+  val POLLING_INTERVAL = 3 seconds
+
+  var yarnJobManager: Option[ActorRef] = None
+  var pollingTimer: Option[Cancellable] = None
+  implicit var timeout: FiniteDuration = 0 seconds
+  var running = false
+  var messagesQueue : mutable.Queue[YarnMessage] = mutable.Queue[YarnMessage]()
+  var latestClusterStatus : Option[FlinkYarnClusterStatus] = None
+  var stopMessageReceiver : Option[ActorRef] = None
+
+  override def preStart(): Unit = {
+    super.preStart()
+
+    timeout = new 
FiniteDuration(GlobalConfiguration.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT,
+      ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS)
+  }
+
+  override def postStop(): Unit = {
+    log.info("Stopped Application client.")
+    pollingTimer foreach {
+      _.cancel()
+    }
+
+    pollingTimer = None
+  }
+
+  override def receiveWithLogMessages: Receive = {
+    // ----------------------------- Registration -> Status updates -> 
shutdown ----------------
+    case LocalRegisterClient(address: String) => {
+      val jmAkkaUrl = JobManager.getRemoteAkkaURL(address)
+
+      yarnJobManager = Some(AkkaUtils.getReference(jmAkkaUrl)(system, timeout))
+      yarnJobManager match {
+        case Some(jm) => {
+          // the message came from the FlinkYarnCluster. We send the message 
to the JobManager.
+          // it is important not to forward the message because the JobManager 
is storing the
+          // sender as the Application Client (this class).
+          jm ! RegisterClient
+
+          // schedule a periodic status report from the JobManager
+          // request the number of task managers and slots from the job manager
+          pollingTimer = 
Some(context.system.scheduler.schedule(INITIAL_POLLING_DELAY,
+            WAIT_FOR_YARN_INTERVAL, yarnJobManager.get, PollYarnClusterStatus))
+        }
+        case None => throw new RuntimeException("Registration at 
JobManager/ApplicationMaster " +
+          "failed. Job Manager RPC connection has not properly been 
initialized");
+      }
+    }
+    case msg: StopYarnSession => {
+      log.info("Stop yarn session.")
+      stopMessageReceiver = Some(sender())
+      yarnJobManager foreach {
+        _ forward msg
+      }
+    }
+    case JobManagerStopped => {
+      log.info("Remote JobManager has been stopped successfully. " +
+        "Stopping local application client")
+      stopMessageReceiver foreach {
+        _ ! JobManagerStopped
+      }
+      // stop ourselves
+      context.system.shutdown()
+    }
+
+    // handle the responses from the PollYarnClusterStatus messages to the 
yarn job mgr
+    case status: FlinkYarnClusterStatus => {
+      latestClusterStatus = Some(status)
+    }
+
+    // locally get cluster status
+    case LocalGetYarnClusterStatus => {
+      sender() ! latestClusterStatus
+    }
+
+    // -----------------  handle messages from the cluster -------------------
+    // receive remote messages
+    case msg: YarnMessage => {
+      messagesQueue.enqueue(msg)
+    }
+    // locally forward messages
+    case LocalGetYarnMessage => {
+      sender() ! (if( messagesQueue.size == 0) None else messagesQueue.dequeue)
+    }
+    case _ =>
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
new file mode 100644
index 0000000..fd67b01
--- /dev/null
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn
+
+import java.io.{PrintWriter, FileWriter, BufferedWriter}
+import java.security.PrivilegedAction
+
+import akka.actor._
+import org.apache.flink.client.CliFrontend
+import org.apache.flink.configuration.{GlobalConfiguration, ConfigConstants}
+import org.apache.flink.runtime.jobmanager.{WithWebServer, JobManager}
+import org.apache.flink.yarn.Messages.StartYarnSession
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+import org.slf4j.LoggerFactory
+
+import scala.io.Source
+
+object ApplicationMaster {
+  import scala.collection.JavaConversions._
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  val CONF_FILE = "flink-conf.yaml"
+  val MODIFIED_CONF_FILE = "flink-conf-modified.yaml"
+
+  def main(args: Array[String]): Unit ={
+    val yarnClientUsername = System.getenv(FlinkYarnClient.ENV_CLIENT_USERNAME)
+    LOG.info(s"YARN daemon runs as 
${UserGroupInformation.getCurrentUser.getShortUserName}" +
+      s" setting user to execute Flink ApplicationMaster/JobManager to 
${yarnClientUsername}")
+
+    val ugi = UserGroupInformation.createRemoteUser(yarnClientUsername)
+
+    for(token <- UserGroupInformation.getCurrentUser.getTokens){
+      ugi.addToken(token)
+    }
+
+    ugi.doAs(new PrivilegedAction[Object] {
+      override def run(): Object = {
+        var actorSystem: ActorSystem = null
+        var jobManager: ActorRef = ActorRef.noSender
+
+        try {
+          val conf = Utils.initializeYarnConfiguration()
+
+          val env = System.getenv()
+
+          if(LOG.isDebugEnabled) {
+            LOG.debug("All environment variables: " + env.toString)
+          }
+
+          val currDir = env.get(Environment.PWD.key())
+          require(currDir != null, "Current directory unknown.")
+
+          val logDirs = env.get(Environment.LOG_DIRS.key())
+
+          // Note that we use the "ownHostname" given by YARN here, to make 
sure
+          // we use the hostnames given by YARN consitently throuout akka.
+          // for akka "localhost" and "localhost.localdomain" are different 
actors.
+          val ownHostname = env.get(Environment.NM_HOST.key())
+          require(ownHostname != null, s"Own hostname not set.")
+
+          val taskManagerCount = env.get(FlinkYarnClient.ENV_TM_COUNT).toInt
+          val slots = env.get(FlinkYarnClient.ENV_SLOTS).toInt
+          val dynamicPropertiesEncodedString = 
env.get(FlinkYarnClient.ENV_DYNAMIC_PROPERTIES)
+
+          val jobManagerWebPort = 0 // automatic assignment.
+
+          val (system, actor) = startJobManager(currDir, 
ownHostname,dynamicPropertiesEncodedString)
+
+          actorSystem = system
+          jobManager = actor
+          val extActor = system.asInstanceOf[ExtendedActorSystem]
+          val jobManagerPort = extActor.provider.getDefaultAddress.port.get
+
+          // generate configuration file for TaskManagers
+          generateConfigurationFile(s"$currDir/$MODIFIED_CONF_FILE", currDir, 
ownHostname,
+            jobManagerPort, jobManagerWebPort, logDirs, slots, 
taskManagerCount,
+            dynamicPropertiesEncodedString)
+
+
+          // send "start yarn session" message to YarnJobManager.
+          LOG.info("Start yarn session on job manager.")
+          jobManager ! StartYarnSession(conf, jobManagerPort)
+
+          LOG.info("Application Master properly initiated. Await termination 
of actor system.")
+          actorSystem.awaitTermination()
+        }catch{
+          case t: Throwable =>
+            LOG.error("Error while running the application master.", t)
+
+            if(actorSystem != null){
+              actorSystem.shutdown()
+              actorSystem.awaitTermination()
+
+              actorSystem = null
+            }
+        }
+
+        null
+      }
+    })
+
+  }
+
+  def generateConfigurationFile(fileName: String, currDir: String, 
ownHostname: String,
+                               jobManagerPort: Int,
+                               jobManagerWebPort: Int, logDirs: String, slots: 
Int,
+                               taskManagerCount: Int, 
dynamicPropertiesEncodedString: String)
+  : Unit = {
+    LOG.info("Generate configuration file for application master.")
+    val output = new PrintWriter(new BufferedWriter(
+      new FileWriter(fileName))
+    )
+
+    for (line <- Source.fromFile(s"$currDir/$CONF_FILE").getLines() if 
!(line.contains
+      (ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY))) {
+      output.println(line)
+    }
+
+    output.println(s"${ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY}: 
$ownHostname")
+    output.println(s"${ConfigConstants.JOB_MANAGER_IPC_PORT_KEY}: 
$jobManagerPort")
+
+    output.println(s"${ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY}: 
$logDirs")
+    output.println(s"${ConfigConstants.JOB_MANAGER_WEB_PORT_KEY}: 
$jobManagerWebPort")
+
+
+    if(slots != -1){
+      output.println(s"${ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS}: $slots")
+      output.println(
+        s"${ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY}: 
${slots*taskManagerCount}")
+    }
+
+    // add dynamic properties
+    val dynamicProperties = 
CliFrontend.getDynamicProperties(dynamicPropertiesEncodedString)
+
+    import scala.collection.JavaConverters._
+
+    for(property <- dynamicProperties.asScala){
+      output.println(s"${property.f0}: ${property.f1}")
+    }
+
+    output.close()
+  }
+
+  def startJobManager(currDir: String, hostname: String, 
dynamicPropertiesEncodedString: String):
+    (ActorSystem, ActorRef) = {
+    LOG.info("Start job manager for yarn")
+    val args = Array[String]("--configDir", currDir)
+
+    LOG.info(s"Config path: ${currDir}.")
+    val (_, _, configuration, _) = JobManager.parseArgs(args)
+
+    // add dynamic properties to JobManager configuration.
+    val dynamicProperties = 
CliFrontend.getDynamicProperties(dynamicPropertiesEncodedString)
+    import scala.collection.JavaConverters._
+    for(property <- dynamicProperties.asScala){
+      configuration.setString(property.f0, property.f1)
+    }
+    GlobalConfiguration.getConfiguration.addAll(configuration) // make part of 
globalConf.
+
+    // set port to 0 to let Akka automatically determine the port.
+    implicit val jobManagerSystem = YarnUtils.createActorSystem(hostname, port 
= 0, configuration)
+
+    LOG.info("Start job manager actor.")
+    (jobManagerSystem, JobManager.startActor(Props(new 
JobManager(configuration) with
+      WithWebServer with YarnJobManager)))
+  }
+}

Reply via email to