[GitHub] flink pull request #5896: [FLINK-8286][Security] Fix kerberos security confi...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/5896#discussion_r185606093 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java --- @@ -73,61 +105,68 @@ public static void main(String[] args) { SignalHandler.register(LOG); JvmShutdownSafeguard.installAsShutdownHook(LOG); - run(args); + try { + SecurityUtils.getInstalledContext().runSecured( + YarnTaskExecutorRunnerFactory.create(System.getenv())); + } catch (Exception e) { + LOG.error("Exception occurred while launching Task Executor runner", e); + throw new RuntimeException(e); + } } /** -* The instance entry point for the YARN task executor. Obtains user group information and calls -* the main work method {@link TaskManagerRunner#runTaskManager(Configuration, ResourceID)} as a -* privileged action. +* Creates a {@link YarnTaskExecutorRunnerFactory.Runner}. * -* @param args The command line arguments. +* @param envs environment variables. */ - private static void run(String[] args) { - try { - LOG.debug("All environment variables: {}", ENV); + @VisibleForTesting + protected static Runner create(Mapenvs) { + LOG.debug("All environment variables: {}", envs); - final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); - final String localDirs = ENV.get(Environment.LOCAL_DIRS.key()); - LOG.info("Current working/local Directory: {}", localDirs); + final String yarnClientUsername = envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); + final String localDirs = envs.get(Environment.LOCAL_DIRS.key()); + LOG.info("Current working/local Directory: {}", localDirs); - final String currDir = ENV.get(Environment.PWD.key()); - LOG.info("Current working Directory: {}", currDir); + final String currDir = envs.get(Environment.PWD.key()); + LOG.info("Current working Directory: {}", currDir); - final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH); - LOG.info("TM: remote keytab path obtained {}", remoteKeytabPath); + final String remoteKeytabPrincipal = envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL); + LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal); - final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL); - LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal); - - final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir); + final Configuration configuration; + try { + configuration = GlobalConfiguration.loadConfiguration(currDir); FileSystem.initialize(configuration); + } catch (Throwable t) { + LOG.error(t.getMessage(), t); + return null; + } - // configure local directory - if (configuration.contains(CoreOptions.TMP_DIRS)) { - LOG.info("Overriding YARN's temporary file directories with those " + - "specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS)); - } - else { - LOG.info("Setting directories for temporary files to: {}", localDirs); - configuration.setString(CoreOptions.TMP_DIRS, localDirs); - } - - // tell akka to die in case of an error - configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true); + // configure local directory + if (configuration.contains(CoreOptions.TMP_DIRS)) { + LOG.info("Overriding YARN's temporary file directories with those " + + "specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS)); + } + else { + LOG.info("Setting directories for temporary files to: {}", localDirs); + configuration.setString(CoreOptions.TMP_DIRS, localDirs); + } - String keytabPath = null; - if (remoteKeytabPath
[GitHub] flink pull request #5896: [FLINK-8286][Security] Fix kerberos security confi...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5896#discussion_r185422695 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java --- @@ -73,61 +105,68 @@ public static void main(String[] args) { SignalHandler.register(LOG); JvmShutdownSafeguard.installAsShutdownHook(LOG); - run(args); + try { + SecurityUtils.getInstalledContext().runSecured( + YarnTaskExecutorRunnerFactory.create(System.getenv())); + } catch (Exception e) { + LOG.error("Exception occurred while launching Task Executor runner", e); + throw new RuntimeException(e); + } } /** -* The instance entry point for the YARN task executor. Obtains user group information and calls -* the main work method {@link TaskManagerRunner#runTaskManager(Configuration, ResourceID)} as a -* privileged action. +* Creates a {@link YarnTaskExecutorRunnerFactory.Runner}. * -* @param args The command line arguments. +* @param envs environment variables. */ - private static void run(String[] args) { - try { - LOG.debug("All environment variables: {}", ENV); + @VisibleForTesting + protected static Runner create(Mapenvs) { + LOG.debug("All environment variables: {}", envs); - final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); - final String localDirs = ENV.get(Environment.LOCAL_DIRS.key()); - LOG.info("Current working/local Directory: {}", localDirs); + final String yarnClientUsername = envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); + final String localDirs = envs.get(Environment.LOCAL_DIRS.key()); + LOG.info("Current working/local Directory: {}", localDirs); - final String currDir = ENV.get(Environment.PWD.key()); - LOG.info("Current working Directory: {}", currDir); + final String currDir = envs.get(Environment.PWD.key()); + LOG.info("Current working Directory: {}", currDir); - final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH); - LOG.info("TM: remote keytab path obtained {}", remoteKeytabPath); + final String remoteKeytabPrincipal = envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL); + LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal); - final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL); - LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal); - - final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir); + final Configuration configuration; + try { + configuration = GlobalConfiguration.loadConfiguration(currDir); FileSystem.initialize(configuration); + } catch (Throwable t) { + LOG.error(t.getMessage(), t); + return null; + } - // configure local directory - if (configuration.contains(CoreOptions.TMP_DIRS)) { - LOG.info("Overriding YARN's temporary file directories with those " + - "specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS)); - } - else { - LOG.info("Setting directories for temporary files to: {}", localDirs); - configuration.setString(CoreOptions.TMP_DIRS, localDirs); - } - - // tell akka to die in case of an error - configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true); + // configure local directory + if (configuration.contains(CoreOptions.TMP_DIRS)) { + LOG.info("Overriding YARN's temporary file directories with those " + + "specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS)); + } + else { + LOG.info("Setting directories for temporary files to: {}", localDirs); + configuration.setString(CoreOptions.TMP_DIRS, localDirs); + } - String keytabPath = null; - if (remoteKeytabPath
[GitHub] flink pull request #5896: [FLINK-8286][Security] Fix kerberos security confi...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/5896#discussion_r185134529 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java --- @@ -73,61 +105,68 @@ public static void main(String[] args) { SignalHandler.register(LOG); JvmShutdownSafeguard.installAsShutdownHook(LOG); - run(args); + try { + SecurityUtils.getInstalledContext().runSecured( + YarnTaskExecutorRunnerFactory.create(System.getenv())); + } catch (Exception e) { + LOG.error("Exception occurred while launching Task Executor runner", e); + throw new RuntimeException(e); + } } /** -* The instance entry point for the YARN task executor. Obtains user group information and calls -* the main work method {@link TaskManagerRunner#runTaskManager(Configuration, ResourceID)} as a -* privileged action. +* Creates a {@link YarnTaskExecutorRunnerFactory.Runner}. * -* @param args The command line arguments. +* @param envs environment variables. */ - private static void run(String[] args) { - try { - LOG.debug("All environment variables: {}", ENV); + @VisibleForTesting + protected static Runner create(Mapenvs) { + LOG.debug("All environment variables: {}", envs); - final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); - final String localDirs = ENV.get(Environment.LOCAL_DIRS.key()); - LOG.info("Current working/local Directory: {}", localDirs); + final String yarnClientUsername = envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); + final String localDirs = envs.get(Environment.LOCAL_DIRS.key()); + LOG.info("Current working/local Directory: {}", localDirs); - final String currDir = ENV.get(Environment.PWD.key()); - LOG.info("Current working Directory: {}", currDir); + final String currDir = envs.get(Environment.PWD.key()); + LOG.info("Current working Directory: {}", currDir); - final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH); - LOG.info("TM: remote keytab path obtained {}", remoteKeytabPath); + final String remoteKeytabPrincipal = envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL); + LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal); - final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL); - LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal); - - final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir); + final Configuration configuration; + try { + configuration = GlobalConfiguration.loadConfiguration(currDir); FileSystem.initialize(configuration); + } catch (Throwable t) { + LOG.error(t.getMessage(), t); --- End diff -- Good point. Added exceptions to method signature and let caller handle it. ---
[GitHub] flink pull request #5896: [FLINK-8286][Security] Fix kerberos security confi...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/5896#discussion_r185135031 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java --- @@ -73,61 +105,68 @@ public static void main(String[] args) { SignalHandler.register(LOG); JvmShutdownSafeguard.installAsShutdownHook(LOG); - run(args); + try { + SecurityUtils.getInstalledContext().runSecured( + YarnTaskExecutorRunnerFactory.create(System.getenv())); + } catch (Exception e) { + LOG.error("Exception occurred while launching Task Executor runner", e); + throw new RuntimeException(e); + } } /** -* The instance entry point for the YARN task executor. Obtains user group information and calls -* the main work method {@link TaskManagerRunner#runTaskManager(Configuration, ResourceID)} as a -* privileged action. +* Creates a {@link YarnTaskExecutorRunnerFactory.Runner}. * -* @param args The command line arguments. +* @param envs environment variables. */ - private static void run(String[] args) { - try { - LOG.debug("All environment variables: {}", ENV); + @VisibleForTesting + protected static Runner create(Mapenvs) { + LOG.debug("All environment variables: {}", envs); - final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); - final String localDirs = ENV.get(Environment.LOCAL_DIRS.key()); - LOG.info("Current working/local Directory: {}", localDirs); + final String yarnClientUsername = envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); + final String localDirs = envs.get(Environment.LOCAL_DIRS.key()); + LOG.info("Current working/local Directory: {}", localDirs); - final String currDir = ENV.get(Environment.PWD.key()); - LOG.info("Current working Directory: {}", currDir); + final String currDir = envs.get(Environment.PWD.key()); + LOG.info("Current working Directory: {}", currDir); - final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH); - LOG.info("TM: remote keytab path obtained {}", remoteKeytabPath); + final String remoteKeytabPrincipal = envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL); + LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal); - final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL); - LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal); - - final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir); + final Configuration configuration; + try { + configuration = GlobalConfiguration.loadConfiguration(currDir); FileSystem.initialize(configuration); + } catch (Throwable t) { + LOG.error(t.getMessage(), t); + return null; + } - // configure local directory - if (configuration.contains(CoreOptions.TMP_DIRS)) { - LOG.info("Overriding YARN's temporary file directories with those " + - "specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS)); - } - else { - LOG.info("Setting directories for temporary files to: {}", localDirs); - configuration.setString(CoreOptions.TMP_DIRS, localDirs); - } - - // tell akka to die in case of an error - configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true); + // configure local directory + if (configuration.contains(CoreOptions.TMP_DIRS)) { + LOG.info("Overriding YARN's temporary file directories with those " + + "specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS)); + } + else { + LOG.info("Setting directories for temporary files to: {}", localDirs); + configuration.setString(CoreOptions.TMP_DIRS, localDirs); + } - String keytabPath = null; - if (remoteKeytabPath
[GitHub] flink pull request #5896: [FLINK-8286][Security] Fix kerberos security confi...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5896#discussion_r185052704 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java --- @@ -73,61 +105,68 @@ public static void main(String[] args) { SignalHandler.register(LOG); JvmShutdownSafeguard.installAsShutdownHook(LOG); - run(args); + try { + SecurityUtils.getInstalledContext().runSecured( + YarnTaskExecutorRunnerFactory.create(System.getenv())); + } catch (Exception e) { + LOG.error("Exception occurred while launching Task Executor runner", e); + throw new RuntimeException(e); + } } /** -* The instance entry point for the YARN task executor. Obtains user group information and calls -* the main work method {@link TaskManagerRunner#runTaskManager(Configuration, ResourceID)} as a -* privileged action. +* Creates a {@link YarnTaskExecutorRunnerFactory.Runner}. * -* @param args The command line arguments. +* @param envs environment variables. */ - private static void run(String[] args) { - try { - LOG.debug("All environment variables: {}", ENV); + @VisibleForTesting + protected static Runner create(Mapenvs) { + LOG.debug("All environment variables: {}", envs); - final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); - final String localDirs = ENV.get(Environment.LOCAL_DIRS.key()); - LOG.info("Current working/local Directory: {}", localDirs); + final String yarnClientUsername = envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); + final String localDirs = envs.get(Environment.LOCAL_DIRS.key()); + LOG.info("Current working/local Directory: {}", localDirs); - final String currDir = ENV.get(Environment.PWD.key()); - LOG.info("Current working Directory: {}", currDir); + final String currDir = envs.get(Environment.PWD.key()); + LOG.info("Current working Directory: {}", currDir); - final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH); - LOG.info("TM: remote keytab path obtained {}", remoteKeytabPath); + final String remoteKeytabPrincipal = envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL); + LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal); - final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL); - LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal); - - final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir); + final Configuration configuration; + try { + configuration = GlobalConfiguration.loadConfiguration(currDir); FileSystem.initialize(configuration); + } catch (Throwable t) { + LOG.error(t.getMessage(), t); + return null; + } - // configure local directory - if (configuration.contains(CoreOptions.TMP_DIRS)) { - LOG.info("Overriding YARN's temporary file directories with those " + - "specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS)); - } - else { - LOG.info("Setting directories for temporary files to: {}", localDirs); - configuration.setString(CoreOptions.TMP_DIRS, localDirs); - } - - // tell akka to die in case of an error - configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true); + // configure local directory + if (configuration.contains(CoreOptions.TMP_DIRS)) { + LOG.info("Overriding YARN's temporary file directories with those " + + "specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS)); + } + else { + LOG.info("Setting directories for temporary files to: {}", localDirs); + configuration.setString(CoreOptions.TMP_DIRS, localDirs); + } - String keytabPath = null; - if (remoteKeytabPath
[GitHub] flink pull request #5896: [FLINK-8286][Security] Fix kerberos security confi...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5896#discussion_r185050544 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java --- @@ -73,61 +105,68 @@ public static void main(String[] args) { SignalHandler.register(LOG); JvmShutdownSafeguard.installAsShutdownHook(LOG); - run(args); + try { + SecurityUtils.getInstalledContext().runSecured( + YarnTaskExecutorRunnerFactory.create(System.getenv())); + } catch (Exception e) { + LOG.error("Exception occurred while launching Task Executor runner", e); + throw new RuntimeException(e); + } } /** -* The instance entry point for the YARN task executor. Obtains user group information and calls -* the main work method {@link TaskManagerRunner#runTaskManager(Configuration, ResourceID)} as a -* privileged action. +* Creates a {@link YarnTaskExecutorRunnerFactory.Runner}. * -* @param args The command line arguments. +* @param envs environment variables. */ - private static void run(String[] args) { - try { - LOG.debug("All environment variables: {}", ENV); + @VisibleForTesting + protected static Runner create(Mapenvs) { + LOG.debug("All environment variables: {}", envs); - final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); - final String localDirs = ENV.get(Environment.LOCAL_DIRS.key()); - LOG.info("Current working/local Directory: {}", localDirs); + final String yarnClientUsername = envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); + final String localDirs = envs.get(Environment.LOCAL_DIRS.key()); + LOG.info("Current working/local Directory: {}", localDirs); - final String currDir = ENV.get(Environment.PWD.key()); - LOG.info("Current working Directory: {}", currDir); + final String currDir = envs.get(Environment.PWD.key()); + LOG.info("Current working Directory: {}", currDir); - final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH); - LOG.info("TM: remote keytab path obtained {}", remoteKeytabPath); + final String remoteKeytabPrincipal = envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL); + LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal); - final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL); - LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal); - - final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir); + final Configuration configuration; + try { + configuration = GlobalConfiguration.loadConfiguration(currDir); FileSystem.initialize(configuration); + } catch (Throwable t) { + LOG.error(t.getMessage(), t); --- End diff -- Why is this exception being swallowed? ---
[GitHub] flink pull request #5896: [FLINK-8286][Security] Fix kerberos security confi...
GitHub user suez1224 opened a pull request: https://github.com/apache/flink/pull/5896 [FLINK-8286][Security] Fix kerberos security configuration for YarnTaskExecutor ## What is the purpose of the change Fix broken YARN kerberos integration for flip-6. ## Brief change log - Fix kerberos onfigurations. - Refactor code. - Add unittest. ## Verifying this change This change added tests and can be verified as follows: - *Added test that validates kerberos credentials are set correctly * ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: ( no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no ) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/suez1224/flink FLINK-8286 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5896.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5896 commit dfeb614c5d5ecba21390c4ee7ceefdefa3a48bf1 Author: Shuyi ChenDate: 2018-04-23T00:35:37Z Fix kerberos security configuration for YarnTaskExecutor ---