Repository: flink
Updated Branches:
  refs/heads/master 6bc6b225e -> 186b12309


[FLINK-5631] [yarn] Support downloading additional jars from non-HDFS paths.

This closes #3202


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/186b1230
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/186b1230
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/186b1230

Branch: refs/heads/master
Commit: 186b12309b540f82a055be28f3f005dce4b8cf46
Parents: 30c5b77
Author: Haohui Mai <whe...@apache.org>
Authored: Tue Jan 31 12:11:01 2017 -0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Feb 13 20:51:50 2017 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/yarn/Utils.java  | 223 +++++++++++++-
 .../flink/yarn/YarnApplicationMasterRunner.java | 236 +--------------
 .../apache/flink/yarn/YarnResourceManager.java  | 211 +------------
 .../java/org/apache/flink/yarn/UtilsTest.java   | 298 +++++++++++++++++++
 .../yarn/YarnApplicationMasterRunnerTest.java   |  93 ++++++
 .../yarn/YarnFlinkResourceManagerTest.java      | 298 -------------------
 6 files changed, 617 insertions(+), 742 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/186b1230/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 94d4582..60f7204 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -23,11 +23,16 @@ import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.util.Records;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,6 +58,8 @@ import 
org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+
 /**
  * Utility class that provides helper methods to work with Apache Hadoop YARN.
  */
@@ -107,7 +114,7 @@ public final class Utils {
                addToEnvironment(
                        appMasterEnv,
                        Environment.CLASSPATH.name(),
-                       appMasterEnv.get(YarnConfigKeys.ENV_FLINK_CLASSPATH));
+                       appMasterEnv.get(ENV_FLINK_CLASSPATH));
                String[] applicationClassPathEntries = conf.getStrings(
                        YarnConfiguration.YARN_APPLICATION_CLASSPATH,
                        YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH);
@@ -264,4 +271,218 @@ public final class Utils {
                }
                return result;
        }
+
+       /**
+        * Creates the launch context, which describes how to bring up a 
TaskExecutor / TaskManager process in
+        * an allocated YARN container.
+        *
+        * <p>This code is extremely YARN specific and registers all the 
resources that the TaskExecutor
+        * needs (such as JAR file, config file, ...) and all environment 
variables in a YARN
+        * container launch context. The launch context then ensures that those 
resources will be
+        * copied into the containers transient working directory.
+        *
+        * @param flinkConfig
+        *               The Flink configuration object.
+        * @param yarnConfig
+        *               The YARN configuration object.
+        * @param env
+        *               The environment variables.
+        * @param tmParams
+        *               The TaskExecutor container memory parameters.
+        * @param taskManagerConfig
+        *               The configuration for the TaskExecutors.
+        * @param workingDirectory
+        *               The current application master container's working 
directory.
+        * @param taskManagerMainClass
+        *               The class with the main method.
+        * @param log
+        *               The logger.
+        *
+        * @return The launch context for the TaskManager processes.
+        *
+        * @throws Exception Thrown if teh launch context could not be created, 
for example if
+        *                                 the resources could not be copied.
+        */
+       static ContainerLaunchContext createTaskExecutorContext(
+               org.apache.flink.configuration.Configuration flinkConfig,
+               YarnConfiguration yarnConfig,
+               Map<String, String> env,
+               ContaineredTaskManagerParameters tmParams,
+               org.apache.flink.configuration.Configuration taskManagerConfig,
+               String workingDirectory,
+               Class<?> taskManagerMainClass,
+               Logger log) throws Exception {
+
+               // get and validate all relevant variables
+
+               String remoteFlinkJarPath = 
env.get(YarnConfigKeys.FLINK_JAR_PATH);
+               require(remoteFlinkJarPath != null, "Environment variable %s 
not set", YarnConfigKeys.FLINK_JAR_PATH);
+
+               String appId = env.get(YarnConfigKeys.ENV_APP_ID);
+               require(appId != null, "Environment variable %s not set", 
YarnConfigKeys.ENV_APP_ID);
+
+               String clientHomeDir = 
env.get(YarnConfigKeys.ENV_CLIENT_HOME_DIR);
+               require(clientHomeDir != null, "Environment variable %s not 
set", YarnConfigKeys.ENV_CLIENT_HOME_DIR);
+
+               String shipListString = 
env.get(YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
+               require(shipListString != null, "Environment variable %s not 
set", YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
+
+               String yarnClientUsername = 
env.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+               require(yarnClientUsername != null, "Environment variable %s 
not set", YarnConfigKeys.ENV_HADOOP_USER_NAME);
+
+               final String remoteKeytabPath = 
env.get(YarnConfigKeys.KEYTAB_PATH);
+               log.info("TM:remote keytab path obtained {}", remoteKeytabPath);
+
+               final String remoteKeytabPrincipal = 
env.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+               log.info("TM:remote keytab principal obtained {}", 
remoteKeytabPrincipal);
+
+               final String remoteYarnConfPath = 
env.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH);
+               log.info("TM:remote yarn conf path obtained {}", 
remoteYarnConfPath);
+
+               final String remoteKrb5Path = 
env.get(YarnConfigKeys.ENV_KRB5_PATH);
+               log.info("TM:remote krb5 path obtained {}", remoteKrb5Path);
+
+               String classPathString = env.get(ENV_FLINK_CLASSPATH);
+               require(classPathString != null, "Environment variable %s not 
set", YarnConfigKeys.ENV_FLINK_CLASSPATH);
+
+               //register keytab
+               LocalResource keytabResource = null;
+               if(remoteKeytabPath != null) {
+                       log.info("Adding keytab {} to the AM container local 
resource bucket", remoteKeytabPath);
+                       keytabResource = Records.newRecord(LocalResource.class);
+                       Path keytabPath = new Path(remoteKeytabPath);
+                       FileSystem fs = keytabPath.getFileSystem(yarnConfig);
+                       registerLocalResource(fs, keytabPath, keytabResource);
+               }
+
+               //To support Yarn Secure Integration Test Scenario
+               LocalResource yarnConfResource = null;
+               LocalResource krb5ConfResource = null;
+               boolean hasKrb5 = false;
+               if(remoteYarnConfPath != null && remoteKrb5Path != null) {
+                       log.info("TM:Adding remoteYarnConfPath {} to the 
container local resource bucket", remoteYarnConfPath);
+                       yarnConfResource = 
Records.newRecord(LocalResource.class);
+                       Path yarnConfPath = new Path(remoteYarnConfPath);
+                       FileSystem fs = yarnConfPath.getFileSystem(yarnConfig);
+                       registerLocalResource(fs, yarnConfPath, 
yarnConfResource);
+
+                       log.info("TM:Adding remoteKrb5Path {} to the container 
local resource bucket", remoteKrb5Path);
+                       krb5ConfResource = 
Records.newRecord(LocalResource.class);
+                       Path krb5ConfPath = new Path(remoteKrb5Path);
+                       fs = krb5ConfPath.getFileSystem(yarnConfig);
+                       registerLocalResource(fs, krb5ConfPath, 
krb5ConfResource);
+
+                       hasKrb5 = true;
+               }
+
+               // register Flink Jar with remote HDFS
+               LocalResource flinkJar = Records.newRecord(LocalResource.class);
+               {
+                       Path remoteJarPath = new Path(remoteFlinkJarPath);
+                       FileSystem fs = remoteJarPath.getFileSystem(yarnConfig);
+                       registerLocalResource(fs, remoteJarPath, flinkJar);
+               }
+
+               // register conf with local fs
+               LocalResource flinkConf = 
Records.newRecord(LocalResource.class);
+               {
+                       // write the TaskManager configuration to a local file
+                       final File taskManagerConfigFile =
+                                       new File(workingDirectory, 
UUID.randomUUID() + "-taskmanager-conf.yaml");
+                       log.debug("Writing TaskManager configuration to {}", 
taskManagerConfigFile.getAbsolutePath());
+                       BootstrapTools.writeConfiguration(taskManagerConfig, 
taskManagerConfigFile);
+
+                       Path homeDirPath = new Path(clientHomeDir);
+                       FileSystem fs = homeDirPath.getFileSystem(yarnConfig);
+                       setupLocalResource(fs, appId,
+                                       new 
Path(taskManagerConfigFile.toURI()), flinkConf, new Path(clientHomeDir));
+
+                       log.info("Prepared local resource for modified yaml: 
{}", flinkConf);
+               }
+
+               Map<String, LocalResource> taskManagerLocalResources = new 
HashMap<>();
+               taskManagerLocalResources.put("flink.jar", flinkJar);
+               taskManagerLocalResources.put("flink-conf.yaml", flinkConf);
+
+               //To support Yarn Secure Integration Test Scenario
+               if(yarnConfResource != null && krb5ConfResource != null) {
+                       taskManagerLocalResources.put(YARN_SITE_FILE_NAME, 
yarnConfResource);
+                       taskManagerLocalResources.put(KRB5_FILE_NAME, 
krb5ConfResource);
+               }
+
+               if(keytabResource != null) {
+                       taskManagerLocalResources.put(KEYTAB_FILE_NAME, 
keytabResource);
+               }
+
+               // prepare additional files to be shipped
+               for (String pathStr : shipListString.split(",")) {
+                       if (!pathStr.isEmpty()) {
+                               LocalResource resource = 
Records.newRecord(LocalResource.class);
+                               Path path = new Path(pathStr);
+                               
registerLocalResource(path.getFileSystem(yarnConfig), path, resource);
+                               taskManagerLocalResources.put(path.getName(), 
resource);
+                       }
+               }
+
+               // now that all resources are prepared, we can create the 
launch context
+
+               log.info("Creating container launch context for TaskManagers");
+
+               boolean hasLogback = new File(workingDirectory, 
"logback.xml").exists();
+               boolean hasLog4j = new File(workingDirectory, 
"log4j.properties").exists();
+
+               String launchCommand = 
BootstrapTools.getTaskManagerShellCommand(
+                               flinkConfig, tmParams, ".", 
ApplicationConstants.LOG_DIR_EXPANSION_VAR,
+                               hasLogback, hasLog4j, hasKrb5, 
taskManagerMainClass);
+
+               log.info("Starting TaskManagers with command: " + 
launchCommand);
+
+               ContainerLaunchContext ctx = 
Records.newRecord(ContainerLaunchContext.class);
+               ctx.setCommands(Collections.singletonList(launchCommand));
+               ctx.setLocalResources(taskManagerLocalResources);
+
+               Map<String, String> containerEnv = new HashMap<>();
+               containerEnv.putAll(tmParams.taskManagerEnv());
+
+               // add YARN classpath, etc to the container environment
+               containerEnv.put(ENV_FLINK_CLASSPATH, classPathString);
+               setupYarnClassPath(yarnConfig, containerEnv);
+
+               containerEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, 
UserGroupInformation.getCurrentUser().getUserName());
+
+               if(remoteKeytabPath != null && remoteKeytabPrincipal != null) {
+                       containerEnv.put(YarnConfigKeys.KEYTAB_PATH, 
remoteKeytabPath);
+                       containerEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, 
remoteKeytabPrincipal);
+               }
+
+               ctx.setEnvironment(containerEnv);
+
+               try (DataOutputBuffer dob = new DataOutputBuffer()) {
+                       log.debug("Adding security tokens to Task Executor 
Container launch Context....");
+                       UserGroupInformation user = 
UserGroupInformation.getCurrentUser();
+                       Credentials credentials = user.getCredentials();
+                       credentials.writeTokenStorageToStream(dob);
+                       ByteBuffer securityTokens = 
ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+                       ctx.setTokens(securityTokens);
+               }
+               catch (Throwable t) {
+                       log.error("Getting current user info failed when trying 
to launch the container", t);
+               }
+
+               return ctx;
+       }
+
+       /**
+        * Validates a condition, throwing a RuntimeException if the condition 
is violated.
+        *
+        * @param condition The condition.
+        * @param message The message for the runtime exception, with format 
variables as defined by
+        *                {@link String#format(String, Object...)}.
+        * @param values The format arguments.
+        */
+       static void require(boolean condition, String message, Object... 
values) {
+               if (!condition) {
+                       throw new RuntimeException(String.format(message, 
values));
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/186b1230/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 5cc51e4..9d5673c 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -47,16 +47,10 @@ import org.apache.flink.runtime.webmonitor.WebMonitor;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.util.Records;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,19 +60,14 @@ import scala.Some;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Map;
-import java.util.HashMap;
-import java.util.UUID;
-import java.util.Collections;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.Utils.require;
 
 /**
  * This class is the executable entry point for the YARN application master.
@@ -329,7 +318,7 @@ public class YarnApplicationMasterRunner {
                                        config, akkaHostname, akkaPort, 
slotsPerTaskManager, TASKMANAGER_REGISTRATION_TIMEOUT);
                        LOG.debug("TaskManager configuration: {}", 
taskManagerConfig);
 
-                       final ContainerLaunchContext taskManagerContext = 
createTaskManagerContext(
+                       final ContainerLaunchContext taskManagerContext = 
Utils.createTaskExecutorContext(
                                config, yarnConfig, ENV,
                                taskManagerParameters, taskManagerConfig,
                                currDir, getTaskManagerClass(), LOG);
@@ -483,20 +472,6 @@ public class YarnApplicationMasterRunner {
        // 
------------------------------------------------------------------------
 
        /**
-        * Validates a condition, throwing a RuntimeException if the condition 
is violated.
-        * 
-        * @param condition The condition.
-        * @param message The message for the runtime exception, with format 
variables as defined by
-        *                {@link String#format(String, Object...)}.
-        * @param values The format arguments.
-        */
-       private static void require(boolean condition, String message, 
Object... values) {
-               if (!condition) {
-                       throw new RuntimeException(String.format(message, 
values));
-               }
-       }
-
-       /**
         * 
         * @param baseDirectory
         * @param additional
@@ -549,211 +524,4 @@ public class YarnApplicationMasterRunner {
 
                return configuration;
        }
-
-       /**
-        * Creates the launch context, which describes how to bring up a 
TaskManager process in
-        * an allocated YARN container.
-        * 
-        * <p>This code is extremely YARN specific and registers all the 
resources that the TaskManager
-        * needs (such as JAR file, config file, ...) and all environment 
variables in a YARN
-        * container launch context. The launch context then ensures that those 
resources will be
-        * copied into the containers transient working directory. 
-        * 
-        * <p>We do this work before we start the ResourceManager actor in 
order to fail early if
-        * any of the operations here fail.
-        * 
-        * @param flinkConfig
-        *         The Flink configuration object.
-        * @param yarnConfig
-        *         The YARN configuration object.
-        * @param env
-        *         The environment variables.
-        * @param tmParams
-        *         The TaskManager container memory parameters. 
-        * @param taskManagerConfig
-        *         The configuration for the TaskManagers.
-        * @param workingDirectory
-        *         The current application master container's working 
directory. 
-        * @param taskManagerMainClass
-        *         The class with the main method.
-        * @param log
-        *         The logger.
-        * 
-        * @return The launch context for the TaskManager processes.
-        * 
-        * @throws Exception Thrown if teh launch context could not be created, 
for example if
-        *                   the resources could not be copied.
-        */
-       public static ContainerLaunchContext createTaskManagerContext(
-                       Configuration flinkConfig,
-                       YarnConfiguration yarnConfig,
-                       Map<String, String> env,
-                       ContaineredTaskManagerParameters tmParams,
-                       Configuration taskManagerConfig,
-                       String workingDirectory,
-                       Class<?> taskManagerMainClass,
-                       Logger log) throws Exception {
-
-               log.info("Setting up resources for TaskManagers");
-
-               // get and validate all relevant variables
-
-               String remoteFlinkJarPath = 
env.get(YarnConfigKeys.FLINK_JAR_PATH);
-               require(remoteFlinkJarPath != null, "Environment variable %s 
not set", YarnConfigKeys.FLINK_JAR_PATH);
-
-               String appId = env.get(YarnConfigKeys.ENV_APP_ID);
-               require(appId != null, "Environment variable %s not set", 
YarnConfigKeys.ENV_APP_ID);
-
-               String clientHomeDir = 
env.get(YarnConfigKeys.ENV_CLIENT_HOME_DIR);
-               require(clientHomeDir != null, "Environment variable %s not 
set", YarnConfigKeys.ENV_CLIENT_HOME_DIR);
-
-               String shipListString = 
env.get(YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
-               require(shipListString != null, "Environment variable %s not 
set", YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
-
-               String yarnClientUsername = 
env.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
-               require(yarnClientUsername != null, "Environment variable %s 
not set", YarnConfigKeys.ENV_HADOOP_USER_NAME);
-
-               final String remoteKeytabPath = 
env.get(YarnConfigKeys.KEYTAB_PATH);
-               LOG.info("TM:remoteKeytabPath obtained {}", remoteKeytabPath);
-
-               final String remoteKeytabPrincipal = 
env.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
-               LOG.info("TM:remoteKeytabPrincipal obtained {}", 
remoteKeytabPrincipal);
-
-               final String remoteYarnConfPath = 
env.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH);
-               LOG.info("TM:remoteYarnConfPath obtained {}", 
remoteYarnConfPath);
-
-               final String remoteKrb5Path = 
env.get(YarnConfigKeys.ENV_KRB5_PATH);
-               LOG.info("TM:remotekrb5Path obtained {}", remoteKrb5Path);
-
-               String classPathString = 
env.get(YarnConfigKeys.ENV_FLINK_CLASSPATH);
-               require(classPathString != null, "Environment variable %s not 
set", YarnConfigKeys.ENV_FLINK_CLASSPATH);
-
-               // obtain a handle to the file system used by YARN
-               final org.apache.hadoop.fs.FileSystem yarnFileSystem;
-               try {
-                       yarnFileSystem = 
org.apache.hadoop.fs.FileSystem.get(yarnConfig);
-               } catch (IOException e) {
-                       throw new Exception("Could not access YARN's default 
file system", e);
-               }
-
-               //register keytab
-               LocalResource keytabResource = null;
-               if(remoteKeytabPath != null) {
-                       LOG.info("Adding keytab {} to the AM container local 
resource bucket", remoteKeytabPath);
-                       keytabResource = Records.newRecord(LocalResource.class);
-                       Path keytabPath = new Path(remoteKeytabPath);
-                       Utils.registerLocalResource(yarnFileSystem, keytabPath, 
keytabResource);
-               }
-
-               //To support Yarn Secure Integration Test Scenario
-               LocalResource yarnConfResource = null;
-               LocalResource krb5ConfResource = null;
-               boolean hasKrb5 = false;
-               if(remoteYarnConfPath != null && remoteKrb5Path != null) {
-                       LOG.info("TM:Adding remoteYarnConfPath {} to the 
container local resource bucket", remoteYarnConfPath);
-                       yarnConfResource = 
Records.newRecord(LocalResource.class);
-                       Path yarnConfPath = new Path(remoteYarnConfPath);
-                       Utils.registerLocalResource(yarnFileSystem, 
yarnConfPath, yarnConfResource);
-
-                       LOG.info("TM:Adding remoteKrb5Path {} to the container 
local resource bucket", remoteKrb5Path);
-                       krb5ConfResource = 
Records.newRecord(LocalResource.class);
-                       Path krb5ConfPath = new Path(remoteKrb5Path);
-                       Utils.registerLocalResource(yarnFileSystem, 
krb5ConfPath, krb5ConfResource);
-
-                       hasKrb5 = true;
-               }
-
-               // register Flink Jar with remote HDFS
-               LocalResource flinkJar = Records.newRecord(LocalResource.class);
-               {
-                       Path remoteJarPath = new Path(remoteFlinkJarPath);
-                       Utils.registerLocalResource(yarnFileSystem, 
remoteJarPath, flinkJar);
-               }
-
-               // register conf with local fs
-               LocalResource flinkConf = 
Records.newRecord(LocalResource.class);
-               {
-                       // write the TaskManager configuration to a local file
-                       final File taskManagerConfigFile = 
-                               new File(workingDirectory, UUID.randomUUID() + 
"-taskmanager-conf.yaml");
-                       LOG.debug("Writing TaskManager configuration to {}", 
taskManagerConfigFile.getAbsolutePath());
-                       BootstrapTools.writeConfiguration(taskManagerConfig, 
taskManagerConfigFile);
-
-                       Utils.setupLocalResource(yarnFileSystem, appId, 
-                               new Path(taskManagerConfigFile.toURI()), 
flinkConf, new Path(clientHomeDir));
-
-                       log.info("Prepared local resource for modified yaml: 
{}", flinkConf);
-               }
-
-               Map<String, LocalResource> taskManagerLocalResources = new 
HashMap<>();
-               taskManagerLocalResources.put("flink.jar", flinkJar);
-               taskManagerLocalResources.put("flink-conf.yaml", flinkConf);
-
-               //To support Yarn Secure Integration Test Scenario
-               if(yarnConfResource != null && krb5ConfResource != null) {
-                       
taskManagerLocalResources.put(Utils.YARN_SITE_FILE_NAME, yarnConfResource);
-                       taskManagerLocalResources.put(Utils.KRB5_FILE_NAME, 
krb5ConfResource);
-               }
-
-               if(keytabResource != null) {
-                       taskManagerLocalResources.put(Utils.KEYTAB_FILE_NAME, 
keytabResource);
-               }
-
-               // prepare additional files to be shipped
-               for (String pathStr : shipListString.split(",")) {
-                       if (!pathStr.isEmpty()) {
-                               LocalResource resource = 
Records.newRecord(LocalResource.class);
-                               Path path = new Path(pathStr);
-                               Utils.registerLocalResource(yarnFileSystem, 
path, resource);
-                               taskManagerLocalResources.put(path.getName(), 
resource);
-                       }
-               }
-
-               // now that all resources are prepared, we can create the 
launch context
-
-               log.info("Creating container launch context for TaskManagers");
-
-               boolean hasLogback = new File(workingDirectory, 
"logback.xml").exists();
-               boolean hasLog4j = new File(workingDirectory, 
"log4j.properties").exists();
-
-               String launchCommand = 
BootstrapTools.getTaskManagerShellCommand(
-                       flinkConfig, tmParams, ".", 
ApplicationConstants.LOG_DIR_EXPANSION_VAR,
-                       hasLogback, hasLog4j, hasKrb5, taskManagerMainClass);
-
-               log.info("Starting TaskManagers with command: " + 
launchCommand);
-
-               ContainerLaunchContext ctx = 
Records.newRecord(ContainerLaunchContext.class);
-               ctx.setCommands(Collections.singletonList(launchCommand));
-               ctx.setLocalResources(taskManagerLocalResources);
-
-               Map<String, String> containerEnv = new HashMap<>();
-               containerEnv.putAll(tmParams.taskManagerEnv());
-
-               // add YARN classpath, etc to the container environment
-               containerEnv.put(ENV_FLINK_CLASSPATH, classPathString);
-               Utils.setupYarnClassPath(yarnConfig, containerEnv);
-
-               containerEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, 
UserGroupInformation.getCurrentUser().getUserName());
-
-               if(remoteKeytabPath != null && remoteKeytabPrincipal != null) {
-                       containerEnv.put(YarnConfigKeys.KEYTAB_PATH, 
remoteKeytabPath);
-                       containerEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, 
remoteKeytabPrincipal);
-               }
-
-               ctx.setEnvironment(containerEnv);
-
-               try (DataOutputBuffer dob = new DataOutputBuffer()) {
-                       LOG.debug("Adding security tokens to Task Manager 
Container launch Context....");
-                       UserGroupInformation user = 
UserGroupInformation.getCurrentUser();
-                       Credentials credentials = user.getCredentials();
-                       credentials.writeTokenStorageToStream(dob);
-                       ByteBuffer securityTokens = 
ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
-                       ctx.setTokens(securityTokens);
-               }
-               catch (Throwable t) {
-                       log.error("Getting current user info failed when trying 
to launch the container", t);
-               }
-
-               return ctx;
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/186b1230/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 9b9ea39..ab96441 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -35,10 +35,6 @@ import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerExcept
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.Priority;
@@ -47,29 +43,20 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.NMClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.util.Records;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.FiniteDuration;
 import org.apache.flink.util.ExceptionUtils;
 
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Collections;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
-
 /**
  * The yarn implementation of the resource manager. Used when the system is 
started
  * via the resource framework YARN.
@@ -357,7 +344,7 @@ public class YarnResourceManager extends 
ResourceManager<ResourceID> implements
                                flinkConfig, "", 0, 1, teRegistrationTimeout);
                LOG.debug("TaskManager configuration: {}", taskManagerConfig);
 
-               ContainerLaunchContext taskExecutorLaunchContext = 
createTaskExecutorContext(
+               ContainerLaunchContext taskExecutorLaunchContext = 
Utils.createTaskExecutorContext(
                                flinkConfig, yarnConfig, ENV,
                                taskManagerParameters, taskManagerConfig,
                                currDir, YarnTaskExecutorRunner.class, LOG);
@@ -371,204 +358,10 @@ public class YarnResourceManager extends 
ResourceManager<ResourceID> implements
        }
 
 
-       /**
-        * Creates the launch context, which describes how to bring up a 
TaskExecutor process in
-        * an allocated YARN container.
-        *
-        * <p>This code is extremely YARN specific and registers all the 
resources that the TaskExecutor
-        * needs (such as JAR file, config file, ...) and all environment 
variables in a YARN
-        * container launch context. The launch context then ensures that those 
resources will be
-        * copied into the containers transient working directory.
-        *
-        * @param flinkConfig
-        *               The Flink configuration object.
-        * @param yarnConfig
-        *               The YARN configuration object.
-        * @param env
-        *               The environment variables.
-        * @param tmParams
-        *               The TaskExecutor container memory parameters.
-        * @param taskManagerConfig
-        *               The configuration for the TaskExecutors.
-        * @param workingDirectory
-        *               The current application master container's working 
directory.
-        * @param taskManagerMainClass
-        *               The class with the main method.
-        * @param log
-        *               The logger.
-        *
-        * @return The launch context for the TaskManager processes.
-        *
-        * @throws Exception Thrown if teh launch context could not be created, 
for example if
-        *                                 the resources could not be copied.
-        */
-       private static ContainerLaunchContext createTaskExecutorContext(
-                       Configuration flinkConfig,
-                       YarnConfiguration yarnConfig,
-                       Map<String, String> env,
-                       ContaineredTaskManagerParameters tmParams,
-                       Configuration taskManagerConfig,
-                       String workingDirectory,
-                       Class<?> taskManagerMainClass,
-                       Logger log) throws Exception {
-
-               // get and validate all relevant variables
-
-               String remoteFlinkJarPath = 
env.get(YarnConfigKeys.FLINK_JAR_PATH);
-               
-               String appId = env.get(YarnConfigKeys.ENV_APP_ID);
-
-               String clientHomeDir = 
env.get(YarnConfigKeys.ENV_CLIENT_HOME_DIR);
-
-               String shipListString = 
env.get(YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
-
-               String yarnClientUsername = 
env.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
-
-               final String remoteKeytabPath = 
env.get(YarnConfigKeys.KEYTAB_PATH);
-               log.info("TM:remote keytab path obtained {}", remoteKeytabPath);
-
-               final String remoteKeytabPrincipal = 
env.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
-               log.info("TM:remote keytab principal obtained {}", 
remoteKeytabPrincipal);
-
-               final String remoteYarnConfPath = 
env.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH);
-               log.info("TM:remote yarn conf path obtained {}", 
remoteYarnConfPath);
-
-               final String remoteKrb5Path = 
env.get(YarnConfigKeys.ENV_KRB5_PATH);
-               log.info("TM:remote krb5 path obtained {}", remoteKrb5Path);
-
-               String classPathString = 
env.get(YarnConfigKeys.ENV_FLINK_CLASSPATH);
-
-               // obtain a handle to the file system used by YARN
-               final org.apache.hadoop.fs.FileSystem yarnFileSystem;
-               try {
-                       yarnFileSystem = 
org.apache.hadoop.fs.FileSystem.get(yarnConfig);
-               } catch (IOException e) {
-                       throw new Exception("Could not access YARN's default 
file system", e);
-               }
-
-               //register keytab
-               LocalResource keytabResource = null;
-               if(remoteKeytabPath != null) {
-                       log.info("Adding keytab {} to the AM container local 
resource bucket", remoteKeytabPath);
-                       keytabResource = Records.newRecord(LocalResource.class);
-                       Path keytabPath = new Path(remoteKeytabPath);
-                       Utils.registerLocalResource(yarnFileSystem, keytabPath, 
keytabResource);
-               }
-
-               //To support Yarn Secure Integration Test Scenario
-               LocalResource yarnConfResource = null;
-               LocalResource krb5ConfResource = null;
-               boolean hasKrb5 = false;
-               if(remoteYarnConfPath != null && remoteKrb5Path != null) {
-                       log.info("TM:Adding remoteYarnConfPath {} to the 
container local resource bucket", remoteYarnConfPath);
-                       yarnConfResource = 
Records.newRecord(LocalResource.class);
-                       Path yarnConfPath = new Path(remoteYarnConfPath);
-                       Utils.registerLocalResource(yarnFileSystem, 
yarnConfPath, yarnConfResource);
-
-                       log.info("TM:Adding remoteKrb5Path {} to the container 
local resource bucket", remoteKrb5Path);
-                       krb5ConfResource = 
Records.newRecord(LocalResource.class);
-                       Path krb5ConfPath = new Path(remoteKrb5Path);
-                       Utils.registerLocalResource(yarnFileSystem, 
krb5ConfPath, krb5ConfResource);
-
-                       hasKrb5 = true;
-               }
-
-               // register Flink Jar with remote HDFS
-               LocalResource flinkJar = Records.newRecord(LocalResource.class);
-               {
-                       Path remoteJarPath = new Path(remoteFlinkJarPath);
-                       Utils.registerLocalResource(yarnFileSystem, 
remoteJarPath, flinkJar);
-               }
-
-               // register conf with local fs
-               LocalResource flinkConf = 
Records.newRecord(LocalResource.class);
-               {
-                       // write the TaskManager configuration to a local file
-                       final File taskManagerConfigFile =
-                                       new File(workingDirectory, 
UUID.randomUUID() + "-taskmanager-conf.yaml");
-                       log.debug("Writing TaskManager configuration to {}", 
taskManagerConfigFile.getAbsolutePath());
-                       BootstrapTools.writeConfiguration(taskManagerConfig, 
taskManagerConfigFile);
-
-                       Utils.setupLocalResource(yarnFileSystem, appId,
-                                       new 
Path(taskManagerConfigFile.toURI()), flinkConf, new Path(clientHomeDir));
 
-                       log.info("Prepared local resource for modified yaml: 
{}", flinkConf);
-               }
-
-               Map<String, LocalResource> taskManagerLocalResources = new 
HashMap<>();
-               taskManagerLocalResources.put("flink.jar", flinkJar);
-               taskManagerLocalResources.put("flink-conf.yaml", flinkConf);
-
-               //To support Yarn Secure Integration Test Scenario
-               if(yarnConfResource != null && krb5ConfResource != null) {
-                       
taskManagerLocalResources.put(Utils.YARN_SITE_FILE_NAME, yarnConfResource);
-                       taskManagerLocalResources.put(Utils.KRB5_FILE_NAME, 
krb5ConfResource);
-               }
-
-               if(keytabResource != null) {
-                       taskManagerLocalResources.put(Utils.KEYTAB_FILE_NAME, 
keytabResource);
-               }
-
-               // prepare additional files to be shipped
-               for (String pathStr : shipListString.split(",")) {
-                       if (!pathStr.isEmpty()) {
-                               LocalResource resource = 
Records.newRecord(LocalResource.class);
-                               Path path = new Path(pathStr);
-                               Utils.registerLocalResource(yarnFileSystem, 
path, resource);
-                               taskManagerLocalResources.put(path.getName(), 
resource);
-                       }
-               }
-
-               // now that all resources are prepared, we can create the 
launch context
-
-               log.info("Creating container launch context for TaskManagers");
-
-               boolean hasLogback = new File(workingDirectory, 
"logback.xml").exists();
-               boolean hasLog4j = new File(workingDirectory, 
"log4j.properties").exists();
-
-               String launchCommand = 
BootstrapTools.getTaskManagerShellCommand(
-                               flinkConfig, tmParams, ".", 
ApplicationConstants.LOG_DIR_EXPANSION_VAR,
-                               hasLogback, hasLog4j, hasKrb5, 
taskManagerMainClass);
-
-               log.info("Starting TaskManagers with command: " + 
launchCommand);
-
-               ContainerLaunchContext ctx = 
Records.newRecord(ContainerLaunchContext.class);
-               ctx.setCommands(Collections.singletonList(launchCommand));
-               ctx.setLocalResources(taskManagerLocalResources);
-
-               Map<String, String> containerEnv = new HashMap<>();
-               containerEnv.putAll(tmParams.taskManagerEnv());
-
-               // add YARN classpath, etc to the container environment
-               containerEnv.put(ENV_FLINK_CLASSPATH, classPathString);
-               Utils.setupYarnClassPath(yarnConfig, containerEnv);
-
-               containerEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, 
UserGroupInformation.getCurrentUser().getUserName());
-
-               if(remoteKeytabPath != null && remoteKeytabPrincipal != null) {
-                       containerEnv.put(YarnConfigKeys.KEYTAB_PATH, 
remoteKeytabPath);
-                       containerEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, 
remoteKeytabPrincipal);
-               }
-
-               ctx.setEnvironment(containerEnv);
-
-               try (DataOutputBuffer dob = new DataOutputBuffer()) {
-                       log.debug("Adding security tokens to Task Executor 
Container launch Context....");
-                       UserGroupInformation user = 
UserGroupInformation.getCurrentUser();
-                       Credentials credentials = user.getCredentials();
-                       credentials.writeTokenStorageToStream(dob);
-                       ByteBuffer securityTokens = 
ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
-                       ctx.setTokens(securityTokens);
-               }
-               catch (Throwable t) {
-                       log.error("Getting current user info failed when trying 
to launch the container", t);
-               }
-
-               return ctx;
-       }
        
        /**
-        * Generate priority by given resouce profile. 
+        * Generate priority by given resource profile.
         * Priority is only used for distinguishing request of different 
resource.
         * @param resourceProfile The resource profile of a request
         * @return The priority of this resource profile.

http://git-wip-us.apache.org/repos/asf/flink/blob/186b1230/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
new file mode 100644
index 0000000..8534ba8
--- /dev/null
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import 
org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
+import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.yarn.messages.NotifyWhenResourcesRegistered;
+import org.apache.flink.yarn.messages.RequestNumberOfRegisteredResources;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class UtilsTest extends TestLogger {
+
+       private static ActorSystem system;
+
+       @BeforeClass
+       public static void setup() {
+               system = AkkaUtils.createLocalActorSystem(new Configuration());
+       }
+
+       @AfterClass
+       public static void teardown() {
+               JavaTestKit.shutdownActorSystem(system);
+       }
+
+       @Test
+       public void testYarnFlinkResourceManagerJobManagerLostLeadership() 
throws Exception {
+               new JavaTestKit(system) {{
+
+                       final Deadline deadline = new FiniteDuration(3, 
TimeUnit.MINUTES).fromNow();
+
+                       Configuration flinkConfig = new Configuration();
+                       YarnConfiguration yarnConfig = new YarnConfiguration();
+                       TestingLeaderRetrievalService leaderRetrievalService = 
new TestingLeaderRetrievalService();
+                       String applicationMasterHostName = "localhost";
+                       String webInterfaceURL = "foobar";
+                       ContaineredTaskManagerParameters taskManagerParameters 
= new ContaineredTaskManagerParameters(
+                               1l, 1l, 1l, 1, new HashMap<String, String>());
+                       ContainerLaunchContext taskManagerLaunchContext = 
mock(ContainerLaunchContext.class);
+                       int yarnHeartbeatIntervalMillis = 1000;
+                       int maxFailedContainers = 10;
+                       int numInitialTaskManagers = 5;
+                       final YarnResourceManagerCallbackHandler 
callbackHandler = new YarnResourceManagerCallbackHandler();
+                       AMRMClientAsync<AMRMClient.ContainerRequest> 
resourceManagerClient = mock(AMRMClientAsync.class);
+                       NMClient nodeManagerClient = mock(NMClient.class);
+                       UUID leaderSessionID = UUID.randomUUID();
+
+                       final List<Container> containerList = new ArrayList<>();
+
+                       for (int i = 0; i < numInitialTaskManagers; i++) {
+                               containerList.add(new 
TestingContainer("container_" + i, "localhost"));
+                       }
+
+                       doAnswer(new Answer() {
+                               int counter = 0;
+                               @Override
+                               public Object answer(InvocationOnMock 
invocation) throws Throwable {
+                                       if (counter < containerList.size()) {
+                                               
callbackHandler.onContainersAllocated(
+                                                       
Collections.singletonList(
+                                                               
containerList.get(counter++)
+                                                       ));
+                                       }
+                                       return null;
+                               }
+                       
}).when(resourceManagerClient).addContainerRequest(Matchers.any(AMRMClient.ContainerRequest.class));
+
+                       ActorRef resourceManager = null;
+                       ActorRef leader1;
+
+                       try {
+                               leader1 = system.actorOf(
+                                       Props.create(
+                                               
TestingUtils.ForwardingActor.class,
+                                               getRef(),
+                                               Option.apply(leaderSessionID)
+                                       ));
+
+                               resourceManager = system.actorOf(
+                                       Props.create(
+                                               
TestingYarnFlinkResourceManager.class,
+                                               flinkConfig,
+                                               yarnConfig,
+                                               leaderRetrievalService,
+                                               applicationMasterHostName,
+                                               webInterfaceURL,
+                                               taskManagerParameters,
+                                               taskManagerLaunchContext,
+                                               yarnHeartbeatIntervalMillis,
+                                               maxFailedContainers,
+                                               numInitialTaskManagers,
+                                               callbackHandler,
+                                               resourceManagerClient,
+                                               nodeManagerClient
+                                       ));
+
+                               
leaderRetrievalService.notifyListener(leader1.path().toString(), 
leaderSessionID);
+
+                               final AkkaActorGateway leader1Gateway = new 
AkkaActorGateway(leader1, leaderSessionID);
+                               final AkkaActorGateway resourceManagerGateway = 
new AkkaActorGateway(resourceManager, leaderSessionID);
+
+                               doAnswer(new Answer() {
+                                       @Override
+                                       public Object answer(InvocationOnMock 
invocation) throws Throwable {
+                                               Container container = 
(Container) invocation.getArguments()[0];
+                                               resourceManagerGateway.tell(new 
NotifyResourceStarted(YarnFlinkResourceManager.extractResourceID(container)),
+                                                       leader1Gateway);
+                                               return null;
+                                       }
+                               
}).when(nodeManagerClient).startContainer(Matchers.any(Container.class), 
Matchers.any(ContainerLaunchContext.class));
+
+                               expectMsgClass(deadline.timeLeft(), 
RegisterResourceManager.class);
+
+                               resourceManagerGateway.tell(new 
RegisterResourceManagerSuccessful(leader1, Collections.EMPTY_LIST));
+
+                               for (int i = 0; i < containerList.size(); i++) {
+                                       expectMsgClass(deadline.timeLeft(), 
Acknowledge.class);
+                               }
+
+                               Future<Object> taskManagerRegisteredFuture = 
resourceManagerGateway.ask(new 
NotifyWhenResourcesRegistered(numInitialTaskManagers), deadline.timeLeft());
+
+                               Await.ready(taskManagerRegisteredFuture, 
deadline.timeLeft());
+
+                               leaderRetrievalService.notifyListener(null, 
null);
+
+                               
leaderRetrievalService.notifyListener(leader1.path().toString(), 
leaderSessionID);
+
+                               expectMsgClass(deadline.timeLeft(), 
RegisterResourceManager.class);
+
+                               resourceManagerGateway.tell(new 
RegisterResourceManagerSuccessful(leader1, Collections.EMPTY_LIST));
+
+                               for (Container container: containerList) {
+                                       resourceManagerGateway.tell(
+                                               new 
NotifyResourceStarted(YarnFlinkResourceManager.extractResourceID(container)),
+                                               leader1Gateway);
+                               }
+
+                               for (int i = 0; i < containerList.size(); i++) {
+                                       expectMsgClass(deadline.timeLeft(), 
Acknowledge.class);
+                               }
+
+                               Future<Object> 
numberOfRegisteredResourcesFuture = 
resourceManagerGateway.ask(RequestNumberOfRegisteredResources.Instance, 
deadline.timeLeft());
+
+                               int numberOfRegisteredResources = (Integer) 
Await.result(numberOfRegisteredResourcesFuture, deadline.timeLeft());
+
+                               assertEquals(numInitialTaskManagers, 
numberOfRegisteredResources);
+                       } finally {
+                               if (resourceManager != null) {
+                                       
resourceManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
+                               }
+                       }
+               }};
+       }
+
+       static class TestingContainer extends Container {
+
+               private final String id;
+               private final String host;
+
+               TestingContainer(String id, String host) {
+                       this.id = id;
+                       this.host = host;
+               }
+
+               @Override
+               public ContainerId getId() {
+                       ContainerId containerId = mock(ContainerId.class);
+                       when(containerId.toString()).thenReturn(id);
+
+                       return containerId;
+               }
+
+               @Override
+               public void setId(ContainerId containerId) {
+
+               }
+
+               @Override
+               public NodeId getNodeId() {
+                       NodeId nodeId = mock(NodeId.class);
+                       when(nodeId.getHost()).thenReturn(host);
+
+                       return nodeId;
+               }
+
+               @Override
+               public void setNodeId(NodeId nodeId) {
+
+               }
+
+               @Override
+               public String getNodeHttpAddress() {
+                       return null;
+               }
+
+               @Override
+               public void setNodeHttpAddress(String s) {
+
+               }
+
+               @Override
+               public Resource getResource() {
+                       return null;
+               }
+
+               @Override
+               public void setResource(Resource resource) {
+
+               }
+
+               @Override
+               public Priority getPriority() {
+                       return null;
+               }
+
+               @Override
+               public void setPriority(Priority priority) {
+
+               }
+
+               @Override
+               public Token getContainerToken() {
+                       return null;
+               }
+
+               @Override
+               public void setContainerToken(Token token) {
+
+               }
+
+               @Override
+               public int compareTo(Container o) {
+                       return 0;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/186b1230/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
new file mode 100644
index 0000000..f874896
--- /dev/null
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Map;
+
+import static org.apache.flink.yarn.YarnConfigKeys.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+public class YarnApplicationMasterRunnerTest {
+       private static final Logger LOG = 
LoggerFactory.getLogger(YarnApplicationMasterRunnerTest.class);
+
+       @Rule
+       public TemporaryFolder folder = new TemporaryFolder();
+
+       @Test
+       public void testCreateTaskExecutorContext() throws Exception {
+               File root = folder.getRoot();
+               File home = new File(root, "home");
+               boolean created = home.mkdir();
+               assertTrue(created);
+
+               Answer<?> getDefault = new Answer<Object>() {
+                       @Override
+                       public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
+                               return invocationOnMock.getArguments()[1];
+                       }
+               };
+               Configuration flinkConf = new Configuration();
+               YarnConfiguration yarnConf = mock(YarnConfiguration.class);
+               doAnswer(getDefault).when(yarnConf).get(anyString(), 
anyString());
+               doAnswer(getDefault).when(yarnConf).getInt(anyString(), 
anyInt());
+               doAnswer(new Answer() {
+                       @Override
+                       public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
+                               return new String[] {(String) 
invocationOnMock.getArguments()[1]};
+                       }
+               }).when(yarnConf).getStrings(anyString(), Mockito.<String> 
anyVararg());
+
+               Map<String, String> env = ImmutableMap. <String, String> 
builder()
+                       .put(ENV_APP_ID, "foo")
+                       .put(ENV_CLIENT_HOME_DIR, home.getAbsolutePath())
+                       .put(ENV_CLIENT_SHIP_FILES, "")
+                       .put(ENV_FLINK_CLASSPATH, "")
+                       .put(ENV_HADOOP_USER_NAME, "foo")
+                       .put(FLINK_JAR_PATH, root.toURI().toString())
+                       .build();
+               ContaineredTaskManagerParameters tmParams = 
mock(ContaineredTaskManagerParameters.class);
+               Configuration taskManagerConf = new Configuration();
+
+               String workingDirectory = root.getAbsolutePath();
+               Class<?> taskManagerMainClass = 
YarnApplicationMasterRunnerTest.class;
+               ContainerLaunchContext ctx = 
Utils.createTaskExecutorContext(flinkConf, yarnConf, env, tmParams,
+                       taskManagerConf, workingDirectory, 
taskManagerMainClass, LOG);
+               assertEquals("file", 
ctx.getLocalResources().get("flink.jar").getResource().getScheme());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/186b1230/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
deleted file mode 100644
index a3ff6c4..0000000
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
+++ /dev/null
@@ -1,298 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
-import 
org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
-import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
-import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.TestLogger;
-import org.apache.flink.yarn.messages.NotifyWhenResourcesRegistered;
-import org.apache.flink.yarn.messages.RequestNumberOfRegisteredResources;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.Token;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.NMClient;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class YarnFlinkResourceManagerTest extends TestLogger {
-
-       private static ActorSystem system;
-
-       @BeforeClass
-       public static void setup() {
-               system = AkkaUtils.createLocalActorSystem(new Configuration());
-       }
-
-       @AfterClass
-       public static void teardown() {
-               JavaTestKit.shutdownActorSystem(system);
-       }
-
-       @Test
-       public void testYarnFlinkResourceManagerJobManagerLostLeadership() 
throws Exception {
-               new JavaTestKit(system) {{
-
-                       final Deadline deadline = new FiniteDuration(3, 
TimeUnit.MINUTES).fromNow();
-
-                       Configuration flinkConfig = new Configuration();
-                       YarnConfiguration yarnConfig = new YarnConfiguration();
-                       TestingLeaderRetrievalService leaderRetrievalService = 
new TestingLeaderRetrievalService();
-                       String applicationMasterHostName = "localhost";
-                       String webInterfaceURL = "foobar";
-                       ContaineredTaskManagerParameters taskManagerParameters 
= new ContaineredTaskManagerParameters(
-                               1l, 1l, 1l, 1, new HashMap<String, String>());
-                       ContainerLaunchContext taskManagerLaunchContext = 
mock(ContainerLaunchContext.class);
-                       int yarnHeartbeatIntervalMillis = 1000;
-                       int maxFailedContainers = 10;
-                       int numInitialTaskManagers = 5;
-                       final YarnResourceManagerCallbackHandler 
callbackHandler = new YarnResourceManagerCallbackHandler();
-                       AMRMClientAsync<AMRMClient.ContainerRequest> 
resourceManagerClient = mock(AMRMClientAsync.class);
-                       NMClient nodeManagerClient = mock(NMClient.class);
-                       UUID leaderSessionID = UUID.randomUUID();
-
-                       final List<Container> containerList = new ArrayList<>();
-
-                       for (int i = 0; i < numInitialTaskManagers; i++) {
-                               containerList.add(new 
TestingContainer("container_" + i, "localhost"));
-                       }
-
-                       doAnswer(new Answer() {
-                               int counter = 0;
-                               @Override
-                               public Object answer(InvocationOnMock 
invocation) throws Throwable {
-                                       if (counter < containerList.size()) {
-                                               
callbackHandler.onContainersAllocated(
-                                                       
Collections.singletonList(
-                                                               
containerList.get(counter++)
-                                                       ));
-                                       }
-                                       return null;
-                               }
-                       
}).when(resourceManagerClient).addContainerRequest(Matchers.any(AMRMClient.ContainerRequest.class));
-
-                       ActorRef resourceManager = null;
-                       ActorRef leader1;
-
-                       try {
-                               leader1 = system.actorOf(
-                                       Props.create(
-                                               
TestingUtils.ForwardingActor.class,
-                                               getRef(),
-                                               Option.apply(leaderSessionID)
-                                       ));
-
-                               resourceManager = system.actorOf(
-                                       Props.create(
-                                               
TestingYarnFlinkResourceManager.class,
-                                               flinkConfig,
-                                               yarnConfig,
-                                               leaderRetrievalService,
-                                               applicationMasterHostName,
-                                               webInterfaceURL,
-                                               taskManagerParameters,
-                                               taskManagerLaunchContext,
-                                               yarnHeartbeatIntervalMillis,
-                                               maxFailedContainers,
-                                               numInitialTaskManagers,
-                                               callbackHandler,
-                                               resourceManagerClient,
-                                               nodeManagerClient
-                                       ));
-
-                               
leaderRetrievalService.notifyListener(leader1.path().toString(), 
leaderSessionID);
-
-                               final AkkaActorGateway leader1Gateway = new 
AkkaActorGateway(leader1, leaderSessionID);
-                               final AkkaActorGateway resourceManagerGateway = 
new AkkaActorGateway(resourceManager, leaderSessionID);
-
-                               doAnswer(new Answer() {
-                                       @Override
-                                       public Object answer(InvocationOnMock 
invocation) throws Throwable {
-                                               Container container = 
(Container) invocation.getArguments()[0];
-                                               resourceManagerGateway.tell(new 
NotifyResourceStarted(YarnFlinkResourceManager.extractResourceID(container)),
-                                                       leader1Gateway);
-                                               return null;
-                                       }
-                               
}).when(nodeManagerClient).startContainer(Matchers.any(Container.class), 
Matchers.any(ContainerLaunchContext.class));
-
-                               expectMsgClass(deadline.timeLeft(), 
RegisterResourceManager.class);
-
-                               resourceManagerGateway.tell(new 
RegisterResourceManagerSuccessful(leader1, Collections.EMPTY_LIST));
-
-                               for (int i = 0; i < containerList.size(); i++) {
-                                       expectMsgClass(deadline.timeLeft(), 
Acknowledge.class);
-                               }
-
-                               Future<Object> taskManagerRegisteredFuture = 
resourceManagerGateway.ask(new 
NotifyWhenResourcesRegistered(numInitialTaskManagers), deadline.timeLeft());
-
-                               Await.ready(taskManagerRegisteredFuture, 
deadline.timeLeft());
-
-                               leaderRetrievalService.notifyListener(null, 
null);
-
-                               
leaderRetrievalService.notifyListener(leader1.path().toString(), 
leaderSessionID);
-
-                               expectMsgClass(deadline.timeLeft(), 
RegisterResourceManager.class);
-
-                               resourceManagerGateway.tell(new 
RegisterResourceManagerSuccessful(leader1, Collections.EMPTY_LIST));
-
-                               for (Container container: containerList) {
-                                       resourceManagerGateway.tell(
-                                               new 
NotifyResourceStarted(YarnFlinkResourceManager.extractResourceID(container)),
-                                               leader1Gateway);
-                               }
-
-                               for (int i = 0; i < containerList.size(); i++) {
-                                       expectMsgClass(deadline.timeLeft(), 
Acknowledge.class);
-                               }
-
-                               Future<Object> 
numberOfRegisteredResourcesFuture = 
resourceManagerGateway.ask(RequestNumberOfRegisteredResources.Instance, 
deadline.timeLeft());
-
-                               int numberOfRegisteredResources = (Integer) 
Await.result(numberOfRegisteredResourcesFuture, deadline.timeLeft());
-
-                               assertEquals(numInitialTaskManagers, 
numberOfRegisteredResources);
-                       } finally {
-                               if (resourceManager != null) {
-                                       
resourceManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
-                               }
-                       }
-               }};
-       }
-
-       static class TestingContainer extends Container {
-
-               private final String id;
-               private final String host;
-
-               TestingContainer(String id, String host) {
-                       this.id = id;
-                       this.host = host;
-               }
-
-               @Override
-               public ContainerId getId() {
-                       ContainerId containerId = mock(ContainerId.class);
-                       when(containerId.toString()).thenReturn(id);
-
-                       return containerId;
-               }
-
-               @Override
-               public void setId(ContainerId containerId) {
-
-               }
-
-               @Override
-               public NodeId getNodeId() {
-                       NodeId nodeId = mock(NodeId.class);
-                       when(nodeId.getHost()).thenReturn(host);
-
-                       return nodeId;
-               }
-
-               @Override
-               public void setNodeId(NodeId nodeId) {
-
-               }
-
-               @Override
-               public String getNodeHttpAddress() {
-                       return null;
-               }
-
-               @Override
-               public void setNodeHttpAddress(String s) {
-
-               }
-
-               @Override
-               public Resource getResource() {
-                       return null;
-               }
-
-               @Override
-               public void setResource(Resource resource) {
-
-               }
-
-               @Override
-               public Priority getPriority() {
-                       return null;
-               }
-
-               @Override
-               public void setPriority(Priority priority) {
-
-               }
-
-               @Override
-               public Token getContainerToken() {
-                       return null;
-               }
-
-               @Override
-               public void setContainerToken(Token token) {
-
-               }
-
-               @Override
-               public int compareTo(Container o) {
-                       return 0;
-               }
-       }
-}

Reply via email to