Repository: flink
Updated Branches:
  refs/heads/master 303f6fee9 -> 68709b087


[FLINK-3929] additional fixes for keytab security

- load flink-jaas.conf from classpath
- avoid using undocumented flink base dir config entry
- enable test cases to run on MacOS
- unify suffix of secure test cases
- fix error logging and reporting

This closes #2275


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/68709b08
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/68709b08
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/68709b08

Branch: refs/heads/master
Commit: 68709b087570402cacb7bc3088e0eb35d83c8d32
Parents: 285b6f7
Author: Maximilian Michels <m...@apache.org>
Authored: Tue Sep 20 15:41:35 2016 +0200
Committer: Maximilian Michels <m...@apache.org>
Committed: Tue Sep 20 22:03:29 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    |  3 -
 .../src/main/flink-bin/conf/flink-jaas.conf     | 26 --------
 .../flink/runtime/security/SecurityContext.java | 67 ++++++++------------
 .../src/main/resources/flink-jaas.conf          | 26 ++++++++
 .../flink/runtime/taskmanager/TaskManager.scala |  2 -
 .../runtime/security/SecurityContextTest.java   |  4 +-
 .../connectors/fs/RollingSinkSecuredITCase.java |  1 -
 .../kafka/Kafka09SecureRunITCase.java           | 62 ------------------
 .../kafka/Kafka09SecuredRunITCase.java          | 62 ++++++++++++++++++
 .../flink/test/util/SecureTestEnvironment.java  | 23 +++----
 .../org/apache/flink/yarn/YarnTestBase.java     |  3 +-
 .../flink/yarn/YarnApplicationMasterRunner.java |  1 -
 .../flink/yarn/YarnTaskManagerRunner.java       |  5 +-
 .../flink/yarn/cli/FlinkYarnSessionCli.java     | 32 +++-------
 14 files changed, 138 insertions(+), 179 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 575ffad..0711758 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -161,9 +161,6 @@ public class CliFrontend {
                                "filesystem scheme from configuration.", e);
                }
 
-               this.config.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, 
configDirectory.getAbsolutePath()
-                               + ".." + File.separator);
-
                this.clientTimeout = AkkaUtils.getClientTimeout(config);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-dist/src/main/flink-bin/conf/flink-jaas.conf
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/conf/flink-jaas.conf 
b/flink-dist/src/main/flink-bin/conf/flink-jaas.conf
deleted file mode 100644
index d476e24..0000000
--- a/flink-dist/src/main/flink-bin/conf/flink-jaas.conf
+++ /dev/null
@@ -1,26 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-# We are using this file as an workaround for the Kafka and ZK SASL 
implementation
-# since they explicitly look for java.security.auth.login.config property
-# The file itself is not used by the application since the internal 
implementation
-# uses a process-wide in-memory java security configuration object.
-# Please do not edit/delete this file - See FLINK-3929
-sample {
-  useKeyTab=false
-  useTicketCache=true;
-};
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
index 4b8b69b..be6611f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
@@ -22,7 +22,8 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.util.Preconditions;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -33,7 +34,12 @@ import org.slf4j.LoggerFactory;
 
 import javax.security.auth.Subject;
 import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
 import java.lang.reflect.Method;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
 import java.security.PrivilegedExceptionAction;
 import java.util.Collection;
 
@@ -170,15 +176,12 @@ public class SecurityContext {
         * Kafka current code behavior.
         */
        private static void populateSystemSecurityProperties(Configuration 
configuration) {
+               Preconditions.checkNotNull(configuration, "The supplied 
configuation was null");
 
                //required to be empty for Kafka but we will override the 
property
                //with pseudo JAAS configuration file if SASL auth is enabled 
for ZK
                System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, "");
 
-               if(configuration == null) {
-                       return;
-               }
-
                boolean disableSaslClient = 
configuration.getBoolean(ConfigConstants.ZOOKEEPER_SASL_DISABLE,
                                ConfigConstants.DEFAULT_ZOOKEEPER_SASL_DISABLE);
                if(disableSaslClient) {
@@ -188,46 +191,26 @@ public class SecurityContext {
                        return;
                }
 
-               String baseDir = 
configuration.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, null);
-               if(baseDir == null) {
-                       String message = "SASL auth is enabled for ZK but 
unable to locate pseudo Jaas config " +
-                                       "since " + 
ConfigConstants.FLINK_BASE_DIR_PATH_KEY + " is not provided";
-                       LOG.error(message);
-                       throw new IllegalConfigurationException(message);
-               }
-
-               File f = new File(baseDir);
-               if(!f.exists() || !f.isDirectory()) {
-                       LOG.error("Invalid flink base directory {} 
configuration provided", baseDir);
-                       throw new IllegalConfigurationException("Invalid flink 
base directory configuration provided");
-               }
-
-               File jaasConfigFile = new File(f, JAAS_CONF_FILENAME);
-
-               if (!jaasConfigFile.exists() || !jaasConfigFile.isFile()) {
-
-                       //check if there is a conf directory
-                       File confDir = new File(f, "conf");
-                       if(!confDir.exists() || !confDir.isDirectory()) {
-                               LOG.error("Could not locate " + 
JAAS_CONF_FILENAME);
-                               throw new IllegalConfigurationException("Could 
not locate " + JAAS_CONF_FILENAME);
-                       }
-
-                       jaasConfigFile = new File(confDir, JAAS_CONF_FILENAME);
-
-                       if (!jaasConfigFile.exists() || 
!jaasConfigFile.isFile()) {
-                               LOG.error("Could not locate " + 
JAAS_CONF_FILENAME);
-                               throw new IllegalConfigurationException("Could 
not locate " + JAAS_CONF_FILENAME);
-                       }
+               // load Jaas config file to initialize SASL
+               final File jaasConfFile;
+               try {
+                       Path jaasConfPath = 
Files.createTempFile(JAAS_CONF_FILENAME, "");
+                       InputStream jaasConfStream = 
SecurityContext.class.getClassLoader().getResourceAsStream(JAAS_CONF_FILENAME);
+                       Files.copy(jaasConfStream, jaasConfPath, 
StandardCopyOption.REPLACE_EXISTING);
+                       jaasConfFile = jaasConfPath.toFile();
+                       jaasConfFile.deleteOnExit();
+               } catch (IOException e) {
+                       throw new RuntimeException("SASL auth is enabled for ZK 
but unable to " +
+                               "locate pseudo Jaas config provided with 
Flink", e);
                }
 
                LOG.info("Enabling {} property with pseudo JAAS config file: 
{}",
-                               JAVA_SECURITY_AUTH_LOGIN_CONFIG, 
jaasConfigFile);
+                               JAVA_SECURITY_AUTH_LOGIN_CONFIG, 
jaasConfFile.getAbsolutePath());
 
                //ZK client module lookup the configuration to handle SASL.
                
//https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L900
-               System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, 
jaasConfigFile.getAbsolutePath());
-               System.setProperty(ZOOKEEPER_SASL_CLIENT,"true");
+               System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, 
jaasConfFile.getAbsolutePath());
+               System.setProperty(ZOOKEEPER_SASL_CLIENT, "true");
 
                String zkSaslServiceName = 
configuration.getString(ConfigConstants.ZOOKEEPER_SASL_SERVICE_NAME, null);
                if(!StringUtils.isBlank(zkSaslServiceName)) {
@@ -250,6 +233,10 @@ public class SecurityContext {
 
                String principal;
 
+               public SecurityConfiguration() {
+                       this.flinkConf = 
GlobalConfiguration.loadConfiguration();
+               }
+
                public String getKeytab() {
                        return keytab;
                }
@@ -310,4 +297,4 @@ public class SecurityContext {
                T run() throws Exception;
        }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-runtime/src/main/resources/flink-jaas.conf
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/resources/flink-jaas.conf 
b/flink-runtime/src/main/resources/flink-jaas.conf
new file mode 100644
index 0000000..7f0f06b
--- /dev/null
+++ b/flink-runtime/src/main/resources/flink-jaas.conf
@@ -0,0 +1,26 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+# We are using this file as an workaround for the Kafka and ZK SASL 
implementation
+# since they explicitly look for java.security.auth.login.config property
+# The file itself is not used by the application since the internal 
implementation
+# uses a process-wide in-memory java security configuration object.
+# Please do not edit/delete this file - See FLINK-3929
+sample {
+  useKeyTab=false
+  useTicketCache=true;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 8534ee1..9e2feb5 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1583,8 +1583,6 @@ object TaskManager {
       }
     }
 
-    conf.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, 
cliConfig.getConfigDir() + "/..")
-
     conf
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java
index 5f3d76a..3c48e8f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java
@@ -35,7 +35,7 @@ public class SecurityContextTest {
                SecurityContext.SecurityConfiguration sc = new 
SecurityContext.SecurityConfiguration();
                try {
                        SecurityContext.install(sc);
-                       
assertEquals(UserGroupInformation.getLoginUser().getUserName(),getOSUserName());
+                       
assertEquals(UserGroupInformation.getLoginUser().getUserName(), 
getOSUserName());
                } catch (Exception e) {
                        fail(e.getMessage());
                }
@@ -59,7 +59,7 @@ public class SecurityContextTest {
                if( osName.contains( "windows" ) ){
                        className = "com.sun.security.auth.module.NTSystem";
                }
-               else if( osName.contains( "linux" ) ){
+               else if( osName.contains( "linux" ) || osName.contains( "mac" ) 
 ){
                        className = "com.sun.security.auth.module.UnixSystem";
                }
                else if( osName.contains( "solaris" ) || osName.contains( 
"sunos" ) ){

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
index 930ddd2..051175a 100644
--- 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
@@ -227,7 +227,6 @@ public class RollingSinkSecuredITCase extends 
RollingSinkITCase {
                        TestStreamEnvironment.setAsContext(cluster, 
DEFAULT_PARALLELISM);
 
                } catch (Exception e) {
-                       LOG.error("Exception occured while creating MiniFlink 
cluster. Reason: {}", e);
                        throw new RuntimeException(e);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecureRunITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecureRunITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecureRunITCase.java
deleted file mode 100644
index d12ec65..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecureRunITCase.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.test.util.SecureTestEnvironment;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/*
- * Kafka Secure Connection (kerberos) IT test case
- */
-public class Kafka09SecureRunITCase extends KafkaConsumerTestBase {
-
-       protected static final Logger LOG = 
LoggerFactory.getLogger(Kafka09SecureRunITCase.class);
-
-       @BeforeClass
-       public static void prepare() throws IOException, ClassNotFoundException 
{
-               
LOG.info("-------------------------------------------------------------------------");
-               LOG.info("    Starting Kafka09SecureRunITCase ");
-               
LOG.info("-------------------------------------------------------------------------");
-
-               SecureTestEnvironment.prepare(tempFolder);
-               
SecureTestEnvironment.populateFlinkSecureConfigurations(getFlinkConfiguration());
-
-               startClusters(true);
-       }
-
-       @AfterClass
-       public static void shutDownServices() {
-               shutdownClusters();
-               SecureTestEnvironment.cleanup();
-       }
-
-
-       //timeout interval is large since in Travis, ZK connection timeout 
occurs frequently
-       //The timeout for the test case is 2 times timeout of ZK connection
-       @Test(timeout = 600000)
-       public void testMultipleTopics() throws Exception {
-               runProduceConsumeMultipleTopics();
-       }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
new file mode 100644
index 0000000..e748537
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.test.util.SecureTestEnvironment;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/*
+ * Kafka Secure Connection (kerberos) IT test case
+ */
+public class Kafka09SecuredRunITCase extends KafkaConsumerTestBase {
+
+       protected static final Logger LOG = 
LoggerFactory.getLogger(Kafka09SecuredRunITCase.class);
+
+       @BeforeClass
+       public static void prepare() throws IOException, ClassNotFoundException 
{
+               
LOG.info("-------------------------------------------------------------------------");
+               LOG.info("    Starting Kafka09SecuredRunITCase ");
+               
LOG.info("-------------------------------------------------------------------------");
+
+               SecureTestEnvironment.prepare(tempFolder);
+               
SecureTestEnvironment.populateFlinkSecureConfigurations(getFlinkConfiguration());
+
+               startClusters(true);
+       }
+
+       @AfterClass
+       public static void shutDownServices() {
+               shutdownClusters();
+               SecureTestEnvironment.cleanup();
+       }
+
+
+       //timeout interval is large since in Travis, ZK connection timeout 
occurs frequently
+       //The timeout for the test case is 2 times timeout of ZK connection
+       @Test(timeout = 600000)
+       public void testMultipleTopics() throws Exception {
+               runProduceConsumeMultipleTopics();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
index 00b19f1..b5e622b 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
@@ -20,6 +20,7 @@ package org.apache.flink.test.util;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.hadoop.minikdc.MiniKdc;
 import org.junit.rules.TemporaryFolder;
@@ -60,12 +61,10 @@ public class SecureTestEnvironment {
 
        private static String hadoopServicePrincipal = null;
 
-       private static File baseDirForSecureRun = null;
-
        public static void prepare(TemporaryFolder tempFolder) {
 
                try {
-                       baseDirForSecureRun = tempFolder.newFolder();
+                       File baseDirForSecureRun = tempFolder.newFolder();
                        LOG.info("Base Directory for Secure Environment: {}", 
baseDirForSecureRun);
 
                        String hostName = "localhost";
@@ -113,19 +112,17 @@ public class SecureTestEnvironment {
                        //See Yarn test case module for reference
                        createJaasConfig(baseDirForSecureRun);
                        SecurityContext.SecurityConfiguration ctx = new 
SecurityContext.SecurityConfiguration();
-                       Configuration flinkConfig = new Configuration();
+                       Configuration flinkConfig = 
GlobalConfiguration.loadConfiguration();
                        
flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, testKeytab);
                        
flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, testPrincipal);
                        
flinkConfig.setBoolean(ConfigConstants.ZOOKEEPER_SASL_DISABLE, false);
-                       
flinkConfig.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, 
baseDirForSecureRun.getAbsolutePath());
                        ctx.setFlinkConfiguration(flinkConfig);
                        TestingSecurityContext.install(ctx, 
getClientSecurityConfigurationMap());
 
-                       populateSystemEnvVariables();
+                       populateJavaPropertyVariables();
 
                } catch(Exception e) {
-                       LOG.error("Exception occured while preparing secure 
environment. Reason: {}", e);
-                       throw new RuntimeException(e);
+                       throw new RuntimeException("Exception occured while 
preparing secure environment.", e);
                }
 
        }
@@ -145,14 +142,12 @@ public class SecureTestEnvironment {
                testPrincipal = null;
                testZkServerPrincipal = null;
                hadoopServicePrincipal = null;
-               baseDirForSecureRun = null;
 
        }
 
-       private static void populateSystemEnvVariables() {
+       private static void populateJavaPropertyVariables() {
 
                if(LOG.isDebugEnabled()) {
-                       System.setProperty("FLINK_JAAS_DEBUG", "true");
                        System.setProperty("sun.security.krb5.debug", "true");
                }
 
@@ -165,7 +160,6 @@ public class SecureTestEnvironment {
 
        private static void resetSystemEnvVariables() {
                System.clearProperty("java.security.krb5.conf");
-               System.clearProperty("FLINK_JAAS_DEBUG");
                System.clearProperty("sun.security.krb5.debug");
 
                System.clearProperty("zookeeper.authProvider.1");
@@ -227,7 +221,7 @@ public class SecureTestEnvironment {
        }
 
        /*
-        * Helper method to create a temporary JAAS configuration file to ger 
around the Kafka and ZK SASL
+        * Helper method to create a temporary JAAS configuration file to get 
around the Kafka and ZK SASL
         * implementation lookup java.security.auth.login.config
         */
        private static void  createJaasConfig(File baseDirForSecureRun) {
@@ -241,8 +235,7 @@ public class SecureTestEnvironment {
                        out.println("useTicketCache=true;");
                        out.println("};");
                } catch (IOException e) {
-                       LOG.error("Exception occured while trying to create 
JAAS config. Reason: {}", e.getMessage());
-                       throw new RuntimeException(e);
+                       throw new RuntimeException("Exception occured while 
trying to create JAAS config.", e);
                }
 
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 605aa44..afdd400 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -427,8 +427,7 @@ public abstract class YarnTestBase extends TestLogger {
                                        
out.println(ConfigConstants.SECURITY_PRINCIPAL_KEY + ": " + principal);
                                        out.println("");
                                } catch (IOException e) {
-                                       LOG.error("Exception occured while 
trying to append the security configurations. Reason: {}", e.getMessage());
-                                       throw new RuntimeException(e);
+                                       throw new RuntimeException("Exception 
occured while trying to append the security configurations.", e);
                                }
 
                                String configDir = 
tempConfPathForSecureRun.getAbsolutePath();

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index efb658a..b27876b 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -176,7 +176,6 @@ public class YarnApplicationMasterRunner {
                                
flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
                                
flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, 
remoteKeytabPrincipal);
                        }
-                       
flinkConfig.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, currDir);
 
                        
SecurityContext.install(sc.setFlinkConfiguration(flinkConfig));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
index c70a30b..21ed52e 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
@@ -127,7 +127,6 @@ public class YarnTaskManagerRunner {
                                
configuration.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
                                
configuration.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, 
remoteKeytabPrincipal);
                        }
-                       
configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, currDir);
 
                        
SecurityContext.install(sc.setFlinkConfiguration(configuration));
 
@@ -145,9 +144,9 @@ public class YarnTaskManagerRunner {
                                }
                        });
                } catch(Exception e) {
-                       LOG.error("Exception occurred while launching Task 
Manager. Reason: {}", e);
+                       LOG.error("Exception occurred while launching Task 
Manager", e);
                        throw new RuntimeException(e);
                }
 
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index b5364f0..d09340c 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -24,7 +24,6 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.configuration.ConfigConstants;
@@ -463,27 +462,17 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                }
        }
 
-       public static void main(final String[] args) {
-               final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "", 
true); // no prefix for the YARN session
-
-               String confDirPath = 
CliFrontend.getConfigurationDirectoryFromEnv();
-               GlobalConfiguration.loadConfiguration(confDirPath);
+       public static void main(final String[] args) throws Exception {
+               final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", 
""); // no prefix for the YARN session
                Configuration flinkConfiguration = 
GlobalConfiguration.loadConfiguration();
-               
flinkConfiguration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, 
confDirPath);
-               try {
-                       SecurityContext.install(new 
SecurityContext.SecurityConfiguration().setFlinkConfiguration(flinkConfiguration));
-                       int retCode = 
SecurityContext.getInstalled().runSecured(new 
SecurityContext.FlinkSecuredRunner<Integer>() {
-                               @Override
-                               public Integer run() {
-                                       return cli.run(args);
-                               }
-                       });
-                       System.exit(retCode);
-               } catch(Exception e) {
-                       e.printStackTrace();
-                       LOG.error("Exception Occured. Reason: {}", e);
-                       return;
-               }
+               SecurityContext.install(new 
SecurityContext.SecurityConfiguration().setFlinkConfiguration(flinkConfiguration));
+               int retCode = SecurityContext.getInstalled().runSecured(new 
SecurityContext.FlinkSecuredRunner<Integer>() {
+                       @Override
+                       public Integer run() {
+                               return cli.run(args);
+                       }
+               });
+               System.exit(retCode);
        }
 
        @Override
@@ -544,7 +533,6 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                try {
                        return yarnClusterDescriptor.deploy();
                } catch (Exception e) {
-                       LOG.error("Error while deploying YARN cluster: 
"+e.getMessage(), e);
                        throw new RuntimeException("Error deploying the YARN 
cluster", e);
                }
 

Reply via email to