[GitHub] flink pull request #5896: [FLINK-8286][Security] Fix kerberos security confi...

2018-05-02 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5896#discussion_r185606093
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java
 ---
@@ -73,61 +105,68 @@ public static void main(String[] args) {
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
 
-   run(args);
+   try {
+   SecurityUtils.getInstalledContext().runSecured(
+   
YarnTaskExecutorRunnerFactory.create(System.getenv()));
+   } catch (Exception e) {
+   LOG.error("Exception occurred while launching Task 
Executor runner", e);
+   throw new RuntimeException(e);
+   }
}
 
/**
-* The instance entry point for the YARN task executor. Obtains user 
group information and calls
-* the main work method {@link 
TaskManagerRunner#runTaskManager(Configuration, ResourceID)}  as a
-* privileged action.
+* Creates a {@link YarnTaskExecutorRunnerFactory.Runner}.
 *
-* @param args The command line arguments.
+* @param envs environment variables.
 */
-   private static void run(String[] args) {
-   try {
-   LOG.debug("All environment variables: {}", ENV);
+   @VisibleForTesting
+   protected static Runner create(Map envs) {
+   LOG.debug("All environment variables: {}", envs);
 
-   final String yarnClientUsername = 
ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
-   final String localDirs = 
ENV.get(Environment.LOCAL_DIRS.key());
-   LOG.info("Current working/local Directory: {}", 
localDirs);
+   final String yarnClientUsername = 
envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+   final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
+   LOG.info("Current working/local Directory: {}", localDirs);
 
-   final String currDir = ENV.get(Environment.PWD.key());
-   LOG.info("Current working Directory: {}", currDir);
+   final String currDir = envs.get(Environment.PWD.key());
+   LOG.info("Current working Directory: {}", currDir);
 
-   final String remoteKeytabPath = 
ENV.get(YarnConfigKeys.KEYTAB_PATH);
-   LOG.info("TM: remote keytab path obtained {}", 
remoteKeytabPath);
+   final String remoteKeytabPrincipal = 
envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+   LOG.info("TM: remote keytab principal obtained {}", 
remoteKeytabPrincipal);
 
-   final String remoteKeytabPrincipal = 
ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
-   LOG.info("TM: remote keytab principal obtained {}", 
remoteKeytabPrincipal);
-
-   final Configuration configuration = 
GlobalConfiguration.loadConfiguration(currDir);
+   final Configuration configuration;
+   try {
+   configuration = 
GlobalConfiguration.loadConfiguration(currDir);
FileSystem.initialize(configuration);
+   } catch (Throwable t) {
+   LOG.error(t.getMessage(), t);
+   return null;
+   }
 
-   // configure local directory
-   if (configuration.contains(CoreOptions.TMP_DIRS)) {
-   LOG.info("Overriding YARN's temporary file 
directories with those " +
-   "specified in the Flink config: " + 
configuration.getValue(CoreOptions.TMP_DIRS));
-   }
-   else {
-   LOG.info("Setting directories for temporary 
files to: {}", localDirs);
-   configuration.setString(CoreOptions.TMP_DIRS, 
localDirs);
-   }
-
-   // tell akka to die in case of an error
-   
configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
+   // configure local directory
+   if (configuration.contains(CoreOptions.TMP_DIRS)) {
+   LOG.info("Overriding YARN's temporary file directories 
with those " +
+   "specified in the Flink config: " + 
configuration.getValue(CoreOptions.TMP_DIRS));
+   }
+   else {
+   LOG.info("Setting directories for temporary files to: 
{}", localDirs);
+   configuration.setString(CoreOptions.TMP_DIRS, 
localDirs);
+   }
 
-   String keytabPath = null;
-   if (remoteKeytabPath 

[GitHub] flink pull request #5896: [FLINK-8286][Security] Fix kerberos security confi...

2018-05-02 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5896#discussion_r185422695
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java
 ---
@@ -73,61 +105,68 @@ public static void main(String[] args) {
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
 
-   run(args);
+   try {
+   SecurityUtils.getInstalledContext().runSecured(
+   
YarnTaskExecutorRunnerFactory.create(System.getenv()));
+   } catch (Exception e) {
+   LOG.error("Exception occurred while launching Task 
Executor runner", e);
+   throw new RuntimeException(e);
+   }
}
 
/**
-* The instance entry point for the YARN task executor. Obtains user 
group information and calls
-* the main work method {@link 
TaskManagerRunner#runTaskManager(Configuration, ResourceID)}  as a
-* privileged action.
+* Creates a {@link YarnTaskExecutorRunnerFactory.Runner}.
 *
-* @param args The command line arguments.
+* @param envs environment variables.
 */
-   private static void run(String[] args) {
-   try {
-   LOG.debug("All environment variables: {}", ENV);
+   @VisibleForTesting
+   protected static Runner create(Map envs) {
+   LOG.debug("All environment variables: {}", envs);
 
-   final String yarnClientUsername = 
ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
-   final String localDirs = 
ENV.get(Environment.LOCAL_DIRS.key());
-   LOG.info("Current working/local Directory: {}", 
localDirs);
+   final String yarnClientUsername = 
envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+   final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
+   LOG.info("Current working/local Directory: {}", localDirs);
 
-   final String currDir = ENV.get(Environment.PWD.key());
-   LOG.info("Current working Directory: {}", currDir);
+   final String currDir = envs.get(Environment.PWD.key());
+   LOG.info("Current working Directory: {}", currDir);
 
-   final String remoteKeytabPath = 
ENV.get(YarnConfigKeys.KEYTAB_PATH);
-   LOG.info("TM: remote keytab path obtained {}", 
remoteKeytabPath);
+   final String remoteKeytabPrincipal = 
envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+   LOG.info("TM: remote keytab principal obtained {}", 
remoteKeytabPrincipal);
 
-   final String remoteKeytabPrincipal = 
ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
-   LOG.info("TM: remote keytab principal obtained {}", 
remoteKeytabPrincipal);
-
-   final Configuration configuration = 
GlobalConfiguration.loadConfiguration(currDir);
+   final Configuration configuration;
+   try {
+   configuration = 
GlobalConfiguration.loadConfiguration(currDir);
FileSystem.initialize(configuration);
+   } catch (Throwable t) {
+   LOG.error(t.getMessage(), t);
+   return null;
+   }
 
-   // configure local directory
-   if (configuration.contains(CoreOptions.TMP_DIRS)) {
-   LOG.info("Overriding YARN's temporary file 
directories with those " +
-   "specified in the Flink config: " + 
configuration.getValue(CoreOptions.TMP_DIRS));
-   }
-   else {
-   LOG.info("Setting directories for temporary 
files to: {}", localDirs);
-   configuration.setString(CoreOptions.TMP_DIRS, 
localDirs);
-   }
-
-   // tell akka to die in case of an error
-   
configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
+   // configure local directory
+   if (configuration.contains(CoreOptions.TMP_DIRS)) {
+   LOG.info("Overriding YARN's temporary file directories 
with those " +
+   "specified in the Flink config: " + 
configuration.getValue(CoreOptions.TMP_DIRS));
+   }
+   else {
+   LOG.info("Setting directories for temporary files to: 
{}", localDirs);
+   configuration.setString(CoreOptions.TMP_DIRS, 
localDirs);
+   }
 
-   String keytabPath = null;
-   if (remoteKeytabPath 

[GitHub] flink pull request #5896: [FLINK-8286][Security] Fix kerberos security confi...

2018-04-30 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5896#discussion_r185134529
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java
 ---
@@ -73,61 +105,68 @@ public static void main(String[] args) {
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
 
-   run(args);
+   try {
+   SecurityUtils.getInstalledContext().runSecured(
+   
YarnTaskExecutorRunnerFactory.create(System.getenv()));
+   } catch (Exception e) {
+   LOG.error("Exception occurred while launching Task 
Executor runner", e);
+   throw new RuntimeException(e);
+   }
}
 
/**
-* The instance entry point for the YARN task executor. Obtains user 
group information and calls
-* the main work method {@link 
TaskManagerRunner#runTaskManager(Configuration, ResourceID)}  as a
-* privileged action.
+* Creates a {@link YarnTaskExecutorRunnerFactory.Runner}.
 *
-* @param args The command line arguments.
+* @param envs environment variables.
 */
-   private static void run(String[] args) {
-   try {
-   LOG.debug("All environment variables: {}", ENV);
+   @VisibleForTesting
+   protected static Runner create(Map envs) {
+   LOG.debug("All environment variables: {}", envs);
 
-   final String yarnClientUsername = 
ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
-   final String localDirs = 
ENV.get(Environment.LOCAL_DIRS.key());
-   LOG.info("Current working/local Directory: {}", 
localDirs);
+   final String yarnClientUsername = 
envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+   final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
+   LOG.info("Current working/local Directory: {}", localDirs);
 
-   final String currDir = ENV.get(Environment.PWD.key());
-   LOG.info("Current working Directory: {}", currDir);
+   final String currDir = envs.get(Environment.PWD.key());
+   LOG.info("Current working Directory: {}", currDir);
 
-   final String remoteKeytabPath = 
ENV.get(YarnConfigKeys.KEYTAB_PATH);
-   LOG.info("TM: remote keytab path obtained {}", 
remoteKeytabPath);
+   final String remoteKeytabPrincipal = 
envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+   LOG.info("TM: remote keytab principal obtained {}", 
remoteKeytabPrincipal);
 
-   final String remoteKeytabPrincipal = 
ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
-   LOG.info("TM: remote keytab principal obtained {}", 
remoteKeytabPrincipal);
-
-   final Configuration configuration = 
GlobalConfiguration.loadConfiguration(currDir);
+   final Configuration configuration;
+   try {
+   configuration = 
GlobalConfiguration.loadConfiguration(currDir);
FileSystem.initialize(configuration);
+   } catch (Throwable t) {
+   LOG.error(t.getMessage(), t);
--- End diff --

Good point. Added exceptions to method signature and let caller handle it.


---


[GitHub] flink pull request #5896: [FLINK-8286][Security] Fix kerberos security confi...

2018-04-30 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5896#discussion_r185135031
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java
 ---
@@ -73,61 +105,68 @@ public static void main(String[] args) {
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
 
-   run(args);
+   try {
+   SecurityUtils.getInstalledContext().runSecured(
+   
YarnTaskExecutorRunnerFactory.create(System.getenv()));
+   } catch (Exception e) {
+   LOG.error("Exception occurred while launching Task 
Executor runner", e);
+   throw new RuntimeException(e);
+   }
}
 
/**
-* The instance entry point for the YARN task executor. Obtains user 
group information and calls
-* the main work method {@link 
TaskManagerRunner#runTaskManager(Configuration, ResourceID)}  as a
-* privileged action.
+* Creates a {@link YarnTaskExecutorRunnerFactory.Runner}.
 *
-* @param args The command line arguments.
+* @param envs environment variables.
 */
-   private static void run(String[] args) {
-   try {
-   LOG.debug("All environment variables: {}", ENV);
+   @VisibleForTesting
+   protected static Runner create(Map envs) {
+   LOG.debug("All environment variables: {}", envs);
 
-   final String yarnClientUsername = 
ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
-   final String localDirs = 
ENV.get(Environment.LOCAL_DIRS.key());
-   LOG.info("Current working/local Directory: {}", 
localDirs);
+   final String yarnClientUsername = 
envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+   final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
+   LOG.info("Current working/local Directory: {}", localDirs);
 
-   final String currDir = ENV.get(Environment.PWD.key());
-   LOG.info("Current working Directory: {}", currDir);
+   final String currDir = envs.get(Environment.PWD.key());
+   LOG.info("Current working Directory: {}", currDir);
 
-   final String remoteKeytabPath = 
ENV.get(YarnConfigKeys.KEYTAB_PATH);
-   LOG.info("TM: remote keytab path obtained {}", 
remoteKeytabPath);
+   final String remoteKeytabPrincipal = 
envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+   LOG.info("TM: remote keytab principal obtained {}", 
remoteKeytabPrincipal);
 
-   final String remoteKeytabPrincipal = 
ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
-   LOG.info("TM: remote keytab principal obtained {}", 
remoteKeytabPrincipal);
-
-   final Configuration configuration = 
GlobalConfiguration.loadConfiguration(currDir);
+   final Configuration configuration;
+   try {
+   configuration = 
GlobalConfiguration.loadConfiguration(currDir);
FileSystem.initialize(configuration);
+   } catch (Throwable t) {
+   LOG.error(t.getMessage(), t);
+   return null;
+   }
 
-   // configure local directory
-   if (configuration.contains(CoreOptions.TMP_DIRS)) {
-   LOG.info("Overriding YARN's temporary file 
directories with those " +
-   "specified in the Flink config: " + 
configuration.getValue(CoreOptions.TMP_DIRS));
-   }
-   else {
-   LOG.info("Setting directories for temporary 
files to: {}", localDirs);
-   configuration.setString(CoreOptions.TMP_DIRS, 
localDirs);
-   }
-
-   // tell akka to die in case of an error
-   
configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
+   // configure local directory
+   if (configuration.contains(CoreOptions.TMP_DIRS)) {
+   LOG.info("Overriding YARN's temporary file directories 
with those " +
+   "specified in the Flink config: " + 
configuration.getValue(CoreOptions.TMP_DIRS));
+   }
+   else {
+   LOG.info("Setting directories for temporary files to: 
{}", localDirs);
+   configuration.setString(CoreOptions.TMP_DIRS, 
localDirs);
+   }
 
-   String keytabPath = null;
-   if (remoteKeytabPath 

[GitHub] flink pull request #5896: [FLINK-8286][Security] Fix kerberos security confi...

2018-04-30 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5896#discussion_r185052704
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java
 ---
@@ -73,61 +105,68 @@ public static void main(String[] args) {
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
 
-   run(args);
+   try {
+   SecurityUtils.getInstalledContext().runSecured(
+   
YarnTaskExecutorRunnerFactory.create(System.getenv()));
+   } catch (Exception e) {
+   LOG.error("Exception occurred while launching Task 
Executor runner", e);
+   throw new RuntimeException(e);
+   }
}
 
/**
-* The instance entry point for the YARN task executor. Obtains user 
group information and calls
-* the main work method {@link 
TaskManagerRunner#runTaskManager(Configuration, ResourceID)}  as a
-* privileged action.
+* Creates a {@link YarnTaskExecutorRunnerFactory.Runner}.
 *
-* @param args The command line arguments.
+* @param envs environment variables.
 */
-   private static void run(String[] args) {
-   try {
-   LOG.debug("All environment variables: {}", ENV);
+   @VisibleForTesting
+   protected static Runner create(Map envs) {
+   LOG.debug("All environment variables: {}", envs);
 
-   final String yarnClientUsername = 
ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
-   final String localDirs = 
ENV.get(Environment.LOCAL_DIRS.key());
-   LOG.info("Current working/local Directory: {}", 
localDirs);
+   final String yarnClientUsername = 
envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+   final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
+   LOG.info("Current working/local Directory: {}", localDirs);
 
-   final String currDir = ENV.get(Environment.PWD.key());
-   LOG.info("Current working Directory: {}", currDir);
+   final String currDir = envs.get(Environment.PWD.key());
+   LOG.info("Current working Directory: {}", currDir);
 
-   final String remoteKeytabPath = 
ENV.get(YarnConfigKeys.KEYTAB_PATH);
-   LOG.info("TM: remote keytab path obtained {}", 
remoteKeytabPath);
+   final String remoteKeytabPrincipal = 
envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+   LOG.info("TM: remote keytab principal obtained {}", 
remoteKeytabPrincipal);
 
-   final String remoteKeytabPrincipal = 
ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
-   LOG.info("TM: remote keytab principal obtained {}", 
remoteKeytabPrincipal);
-
-   final Configuration configuration = 
GlobalConfiguration.loadConfiguration(currDir);
+   final Configuration configuration;
+   try {
+   configuration = 
GlobalConfiguration.loadConfiguration(currDir);
FileSystem.initialize(configuration);
+   } catch (Throwable t) {
+   LOG.error(t.getMessage(), t);
+   return null;
+   }
 
-   // configure local directory
-   if (configuration.contains(CoreOptions.TMP_DIRS)) {
-   LOG.info("Overriding YARN's temporary file 
directories with those " +
-   "specified in the Flink config: " + 
configuration.getValue(CoreOptions.TMP_DIRS));
-   }
-   else {
-   LOG.info("Setting directories for temporary 
files to: {}", localDirs);
-   configuration.setString(CoreOptions.TMP_DIRS, 
localDirs);
-   }
-
-   // tell akka to die in case of an error
-   
configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
+   // configure local directory
+   if (configuration.contains(CoreOptions.TMP_DIRS)) {
+   LOG.info("Overriding YARN's temporary file directories 
with those " +
+   "specified in the Flink config: " + 
configuration.getValue(CoreOptions.TMP_DIRS));
+   }
+   else {
+   LOG.info("Setting directories for temporary files to: 
{}", localDirs);
+   configuration.setString(CoreOptions.TMP_DIRS, 
localDirs);
+   }
 
-   String keytabPath = null;
-   if (remoteKeytabPath 

[GitHub] flink pull request #5896: [FLINK-8286][Security] Fix kerberos security confi...

2018-04-30 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5896#discussion_r185050544
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java
 ---
@@ -73,61 +105,68 @@ public static void main(String[] args) {
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
 
-   run(args);
+   try {
+   SecurityUtils.getInstalledContext().runSecured(
+   
YarnTaskExecutorRunnerFactory.create(System.getenv()));
+   } catch (Exception e) {
+   LOG.error("Exception occurred while launching Task 
Executor runner", e);
+   throw new RuntimeException(e);
+   }
}
 
/**
-* The instance entry point for the YARN task executor. Obtains user 
group information and calls
-* the main work method {@link 
TaskManagerRunner#runTaskManager(Configuration, ResourceID)}  as a
-* privileged action.
+* Creates a {@link YarnTaskExecutorRunnerFactory.Runner}.
 *
-* @param args The command line arguments.
+* @param envs environment variables.
 */
-   private static void run(String[] args) {
-   try {
-   LOG.debug("All environment variables: {}", ENV);
+   @VisibleForTesting
+   protected static Runner create(Map envs) {
+   LOG.debug("All environment variables: {}", envs);
 
-   final String yarnClientUsername = 
ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
-   final String localDirs = 
ENV.get(Environment.LOCAL_DIRS.key());
-   LOG.info("Current working/local Directory: {}", 
localDirs);
+   final String yarnClientUsername = 
envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+   final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
+   LOG.info("Current working/local Directory: {}", localDirs);
 
-   final String currDir = ENV.get(Environment.PWD.key());
-   LOG.info("Current working Directory: {}", currDir);
+   final String currDir = envs.get(Environment.PWD.key());
+   LOG.info("Current working Directory: {}", currDir);
 
-   final String remoteKeytabPath = 
ENV.get(YarnConfigKeys.KEYTAB_PATH);
-   LOG.info("TM: remote keytab path obtained {}", 
remoteKeytabPath);
+   final String remoteKeytabPrincipal = 
envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+   LOG.info("TM: remote keytab principal obtained {}", 
remoteKeytabPrincipal);
 
-   final String remoteKeytabPrincipal = 
ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
-   LOG.info("TM: remote keytab principal obtained {}", 
remoteKeytabPrincipal);
-
-   final Configuration configuration = 
GlobalConfiguration.loadConfiguration(currDir);
+   final Configuration configuration;
+   try {
+   configuration = 
GlobalConfiguration.loadConfiguration(currDir);
FileSystem.initialize(configuration);
+   } catch (Throwable t) {
+   LOG.error(t.getMessage(), t);
--- End diff --

Why is this exception being swallowed?


---


[GitHub] flink pull request #5896: [FLINK-8286][Security] Fix kerberos security confi...

2018-04-23 Thread suez1224
GitHub user suez1224 opened a pull request:

https://github.com/apache/flink/pull/5896

[FLINK-8286][Security] Fix kerberos security configuration for 
YarnTaskExecutor

## What is the purpose of the change

Fix broken YARN kerberos integration for flip-6.


## Brief change log

  - Fix kerberos onfigurations.
  - Refactor code.
  - Add unittest.

## Verifying this change

This change added tests and can be verified as follows:
  - *Added test that validates kerberos credentials are set correctly *

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no )
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/suez1224/flink FLINK-8286

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5896.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5896


commit dfeb614c5d5ecba21390c4ee7ceefdefa3a48bf1
Author: Shuyi Chen 
Date:   2018-04-23T00:35:37Z

Fix kerberos security configuration for YarnTaskExecutor




---