[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19272 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r151239494 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala --- @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +/** + * The MesosHadoopDelegationTokenManager fetches and updates Hadoop delegation tokens on the behalf + * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN AMCredentialRenewer, + * and similarly will renew the Credentials when 75% of the renewal interval has passed. + * The principal difference is that instead of writing the new credentials to HDFS and + * incrementing the timestamp of the file, the new credentials (called Tokens when they are + * serialized) are broadcast to all running executors. On the executor side, when new Tokens are + * received they overwrite the current credentials. + */ +private[spark] class MesosHadoopDelegationTokenManager( +conf: SparkConf, +hadoopConfig: Configuration, +driverEndpoint: RpcEndpointRef) + extends Logging { + + require(driverEndpoint != null, "DriverEndpoint is not initialized") + + private val credentialRenewerThread: ScheduledExecutorService = +ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread") + + private val tokenManager: HadoopDelegationTokenManager = +new HadoopDelegationTokenManager(conf, hadoopConfig) + + private val principal: String = conf.get(config.PRINCIPAL).orNull + + private var (tokens: Array[Byte], timeOfNextRenewal: Long) = { +try { + val creds = UserGroupInformation.getCurrentUser.getCredentials + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds) + logInfo(s"Initialized tokens: ${SparkHadoopUtil.get.dumpTokens(creds)}") + (SparkHadoopUtil.get.serialize(creds), rt) +} catch { + case e: Exception => +logError(s"Failed to fetch Hadoop delegation tokens $e") +throw e +} + } + + private val keytabFile: Option[String] = conf.get(config.KEYTAB) + + scheduleTokenRenewal() + + private def scheduleTokenRenewal(): Unit = { +if (keytabFile.isDefined) { + require(principal != null, "Principal is required for Keytab-based authentication") + logInfo(s"Using keytab: ${keytabFile.get} and principal $principal") +} else { + logInfo("Using ticket cache for Kerberos authentication, no token renewal.") + return +} + +def scheduleRenewal(runnable: Runnable): Unit = { + val remainingTime = timeOfNextRenewal - System.currentTimeMillis() + if (remainingTime <= 0) { +logInfo("Credentials have expired, creating new ones now.") +runnable.run() + } else { +logInfo(s"Scheduling login from keytab in $remainingTime millis.") +credentialRenewerThread.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS) + } +} + +val credentialRenewerRunnable = + new Runnable { +override def run(): Unit = { +
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r151219050 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala --- @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +/** + * The MesosHadoopDelegationTokenManager fetches and updates Hadoop delegation tokens on the behalf + * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN AMCredentialRenewer, + * and similarly will renew the Credentials when 75% of the renewal interval has passed. + * The principal difference is that instead of writing the new credentials to HDFS and + * incrementing the timestamp of the file, the new credentials (called Tokens when they are + * serialized) are broadcast to all running executors. On the executor side, when new Tokens are + * received they overwrite the current credentials. + */ +private[spark] class MesosHadoopDelegationTokenManager( +conf: SparkConf, +hadoopConfig: Configuration, +driverEndpoint: RpcEndpointRef) + extends Logging { + + require(driverEndpoint != null, "DriverEndpoint is not initialized") + + private val credentialRenewerThread: ScheduledExecutorService = +ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread") + + private val tokenManager: HadoopDelegationTokenManager = +new HadoopDelegationTokenManager(conf, hadoopConfig) + + private val principal: String = conf.get(config.PRINCIPAL).orNull + + private var (tokens: Array[Byte], timeOfNextRenewal: Long) = { +try { + val creds = UserGroupInformation.getCurrentUser.getCredentials + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds) + logInfo(s"Initialized tokens: ${SparkHadoopUtil.get.dumpTokens(creds)}") + (SparkHadoopUtil.get.serialize(creds), rt) +} catch { + case e: Exception => +logError(s"Failed to fetch Hadoop delegation tokens $e") +throw e +} + } + + private val keytabFile: Option[String] = conf.get(config.KEYTAB) + + scheduleTokenRenewal() + + private def scheduleTokenRenewal(): Unit = { +if (keytabFile.isDefined) { + require(principal != null, "Principal is required for Keytab-based authentication") + logInfo(s"Using keytab: ${keytabFile.get} and principal $principal") +} else { + logInfo("Using ticket cache for Kerberos authentication, no token renewal.") + return +} + +def scheduleRenewal(runnable: Runnable): Unit = { + val remainingTime = timeOfNextRenewal - System.currentTimeMillis() + if (remainingTime <= 0) { +logInfo("Credentials have expired, creating new ones now.") +runnable.run() + } else { +logInfo(s"Scheduling login from keytab in $remainingTime millis.") +credentialRenewerThread.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS) + } +} + +val credentialRenewerRunnable = + new Runnable { +override def run(): Unit = { +
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r151216053 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala --- @@ -463,6 +474,19 @@ object SparkHadoopUtil { } /** + * Given an expiration date (e.g. for Hadoop Delegation Tokens) return a the date + * when a given fraction of the duration until the expiration date has passed. + * Formula: current time + (fraction * (time until expiration)) + * @param expirationDate Drop-dead expiration date + * @param fraction fraction of the time until expiration return + * @return Date when the fraction of the time until expiration has passed + */ + def getDateOfNextUpdate(expirationDate: Long, fraction: Double): Long = { --- End diff -- Add `private[spark]`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r151217882 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala --- @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +/** + * The MesosHadoopDelegationTokenManager fetches and updates Hadoop delegation tokens on the behalf + * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN AMCredentialRenewer, + * and similarly will renew the Credentials when 75% of the renewal interval has passed. + * The principal difference is that instead of writing the new credentials to HDFS and + * incrementing the timestamp of the file, the new credentials (called Tokens when they are + * serialized) are broadcast to all running executors. On the executor side, when new Tokens are + * received they overwrite the current credentials. + */ +private[spark] class MesosHadoopDelegationTokenManager( +conf: SparkConf, +hadoopConfig: Configuration, +driverEndpoint: RpcEndpointRef) + extends Logging { + + require(driverEndpoint != null, "DriverEndpoint is not initialized") + + private val credentialRenewerThread: ScheduledExecutorService = +ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread") + + private val tokenManager: HadoopDelegationTokenManager = +new HadoopDelegationTokenManager(conf, hadoopConfig) + + private val principal: String = conf.get(config.PRINCIPAL).orNull + + private var (tokens: Array[Byte], timeOfNextRenewal: Long) = { +try { + val creds = UserGroupInformation.getCurrentUser.getCredentials + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds) + logInfo(s"Initialized tokens: ${SparkHadoopUtil.get.dumpTokens(creds)}") + (SparkHadoopUtil.get.serialize(creds), rt) +} catch { + case e: Exception => +logError(s"Failed to fetch Hadoop delegation tokens $e") +throw e +} + } + + private val keytabFile: Option[String] = conf.get(config.KEYTAB) + + scheduleTokenRenewal() + + private def scheduleTokenRenewal(): Unit = { +if (keytabFile.isDefined) { + require(principal != null, "Principal is required for Keytab-based authentication") + logInfo(s"Using keytab: ${keytabFile.get} and principal $principal") +} else { + logInfo("Using ticket cache for Kerberos authentication, no token renewal.") + return +} + +def scheduleRenewal(runnable: Runnable): Unit = { + val remainingTime = timeOfNextRenewal - System.currentTimeMillis() + if (remainingTime <= 0) { +logInfo("Credentials have expired, creating new ones now.") +runnable.run() + } else { +logInfo(s"Scheduling login from keytab in $remainingTime millis.") +credentialRenewerThread.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS) + } +} + +val credentialRenewerRunnable = + new Runnable { +override def run(): Unit = { +
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r151216025 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala --- @@ -140,13 +140,24 @@ class SparkHadoopUtil extends Logging { if (!new File(keytabFilename).exists()) { throw new SparkException(s"Keytab file: ${keytabFilename} does not exist") } else { - logInfo("Attempting to login to Kerberos" + -s" using principal: ${principalName} and keytab: ${keytabFilename}") + logInfo("Attempting to login to Kerberos " + +s"using principal: ${principalName} and keytab: ${keytabFilename}") UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename) } } /** + * Add or overwrite current user's credentials with serialized delegation tokens, + * also confirms correct hadoop configuration is set. + */ + def addDelegationTokens(tokens: Array[Byte], sparkConf: SparkConf) { --- End diff -- Always forget this class is public. Add `private[spark]`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r150942920 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala --- @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +/** + * The MesosHadoopDelegationTokenManager fetches and updates Hadoop delegation tokens on the behalf + * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN AMCredentialRenewer, + * and similarly will renew the Credentials when 75% of the renewal interval has passed. + * The principal difference is that instead of writing the new credentials to HDFS and + * incrementing the timestamp of the file, the new credentials (called Tokens when they are + * serialized) are broadcast to all running executors. On the executor side, when new Tokens are + * received they overwrite the current credentials. + */ +private[spark] class MesosHadoopDelegationTokenManager( +conf: SparkConf, +hadoopConfig: Configuration, +driverEndpoint: RpcEndpointRef) + extends Logging { + + require(driverEndpoint != null, "DriverEndpoint is not initialized") + + private val credentialRenewerThread: ScheduledExecutorService = +ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread") + + private val tokenManager: HadoopDelegationTokenManager = +new HadoopDelegationTokenManager(conf, hadoopConfig) + + private val principal: String = conf.get(config.PRINCIPAL).orNull + + private val (secretFile: Option[String], mode: String) = getSecretFile(conf) + + private var (tokens: Array[Byte], timeOfNextRenewal: Long) = { +try { + val creds = UserGroupInformation.getCurrentUser.getCredentials + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds) + logInfo(s"Initialized tokens: ${SparkHadoopUtil.get.dumpTokens(creds)}") + (SparkHadoopUtil.get.serialize(creds), rt) +} catch { + case e: Exception => +logError("Failed to initialize Hadoop delegation tokens\n" + + s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file $secretFile\n\tException: $e") +throw e +} + } + + scheduleTokenRenewal() + + private def getSecretFile(conf: SparkConf): (Option[String], String) = { +val keytab = conf.get(config.KEYTAB) +val tgt = Option(conf.getenv(SparkHadoopUtil.TICKET_CACHE_ENVVAR)) +val (secretFile, mode) = if (keytab.isDefined && tgt.isDefined) { + // if a keytab and a specific ticket cache is specified use the keytab and log the behavior + logWarning(s"Keytab and TGT were detected, using keytab, ${keytab.get}, " + +s"unset ${config.KEYTAB.key} to use TGT (${tgt.get})") + (keytab, "keytab") +} else { + val m = if (keytab.isDefined) "keytab" else "tgt" + val sf = if (keytab.isDefined) keytab else tgt + (sf, m) +} + +if (principal == null) { + require(mode == "tgt", s"Must specify a principal when using a Keytab, was $principal") + logInfo(s"Using ticket
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r150934881 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala --- @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +/** + * The MesosHadoopDelegationTokenManager fetches and updates Hadoop delegation tokens on the behalf + * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN AMCredentialRenewer, + * and similarly will renew the Credentials when 75% of the renewal interval has passed. + * The principal difference is that instead of writing the new credentials to HDFS and + * incrementing the timestamp of the file, the new credentials (called Tokens when they are + * serialized) are broadcast to all running executors. On the executor side, when new Tokens are + * received they overwrite the current credentials. + */ +private[spark] class MesosHadoopDelegationTokenManager( +conf: SparkConf, +hadoopConfig: Configuration, +driverEndpoint: RpcEndpointRef) + extends Logging { + + require(driverEndpoint != null, "DriverEndpoint is not initialized") + + private val credentialRenewerThread: ScheduledExecutorService = +ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread") + + private val tokenManager: HadoopDelegationTokenManager = +new HadoopDelegationTokenManager(conf, hadoopConfig) + + private val principal: String = conf.get(config.PRINCIPAL).orNull + + private val (secretFile: Option[String], mode: String) = getSecretFile(conf) + + private var (tokens: Array[Byte], timeOfNextRenewal: Long) = { +try { + val creds = UserGroupInformation.getCurrentUser.getCredentials + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds) + logInfo(s"Initialized tokens: ${SparkHadoopUtil.get.dumpTokens(creds)}") + (SparkHadoopUtil.get.serialize(creds), rt) +} catch { + case e: Exception => +logError("Failed to initialize Hadoop delegation tokens\n" + + s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file $secretFile\n\tException: $e") +throw e +} + } + + scheduleTokenRenewal() + + private def getSecretFile(conf: SparkConf): (Option[String], String) = { +val keytab = conf.get(config.KEYTAB) +val tgt = Option(conf.getenv(SparkHadoopUtil.TICKET_CACHE_ENVVAR)) +val (secretFile, mode) = if (keytab.isDefined && tgt.isDefined) { + // if a keytab and a specific ticket cache is specified use the keytab and log the behavior + logWarning(s"Keytab and TGT were detected, using keytab, ${keytab.get}, " + +s"unset ${config.KEYTAB.key} to use TGT (${tgt.get})") + (keytab, "keytab") +} else { + val m = if (keytab.isDefined) "keytab" else "tgt" + val sf = if (keytab.isDefined) keytab else tgt + (sf, m) +} + +if (principal == null) { + require(mode == "tgt", s"Must specify a principal when using a Keytab, was $principal") + logInfo(s"Using ticket
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r150933634 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala --- @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +/** + * The MesosHadoopDelegationTokenManager fetches and updates Hadoop delegation tokens on the behalf + * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN AMCredentialRenewer, + * and similarly will renew the Credentials when 75% of the renewal interval has passed. + * The principal difference is that instead of writing the new credentials to HDFS and + * incrementing the timestamp of the file, the new credentials (called Tokens when they are + * serialized) are broadcast to all running executors. On the executor side, when new Tokens are + * received they overwrite the current credentials. + */ +private[spark] class MesosHadoopDelegationTokenManager( +conf: SparkConf, +hadoopConfig: Configuration, +driverEndpoint: RpcEndpointRef) + extends Logging { + + require(driverEndpoint != null, "DriverEndpoint is not initialized") + + private val credentialRenewerThread: ScheduledExecutorService = +ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread") + + private val tokenManager: HadoopDelegationTokenManager = +new HadoopDelegationTokenManager(conf, hadoopConfig) + + private val principal: String = conf.get(config.PRINCIPAL).orNull + + private val (secretFile: Option[String], mode: String) = getSecretFile(conf) + + private var (tokens: Array[Byte], timeOfNextRenewal: Long) = { +try { + val creds = UserGroupInformation.getCurrentUser.getCredentials + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds) + logInfo(s"Initialized tokens: ${SparkHadoopUtil.get.dumpTokens(creds)}") + (SparkHadoopUtil.get.serialize(creds), rt) +} catch { + case e: Exception => +logError("Failed to initialize Hadoop delegation tokens\n" + + s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file $secretFile\n\tException: $e") +throw e +} + } + + scheduleTokenRenewal() + + private def getSecretFile(conf: SparkConf): (Option[String], String) = { +val keytab = conf.get(config.KEYTAB) +val tgt = Option(conf.getenv(SparkHadoopUtil.TICKET_CACHE_ENVVAR)) +val (secretFile, mode) = if (keytab.isDefined && tgt.isDefined) { + // if a keytab and a specific ticket cache is specified use the keytab and log the behavior + logWarning(s"Keytab and TGT were detected, using keytab, ${keytab.get}, " + +s"unset ${config.KEYTAB.key} to use TGT (${tgt.get})") + (keytab, "keytab") +} else { + val m = if (keytab.isDefined) "keytab" else "tgt" + val sf = if (keytab.isDefined) keytab else tgt + (sf, m) +} + +if (principal == null) { + require(mode == "tgt", s"Must specify a principal when using a Keytab, was $principal") + logInfo(s"Using ticket
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r150717225 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala --- @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +/** + * The MesosHadoopDelegationTokenManager fetches and updates Hadoop delegation tokens on the behalf + * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN AMCredentialRenewer, + * and similarly will renew the Credentials when 75% of the renewal interval has passed. + * The principal difference is that instead of writing the new credentials to HDFS and + * incrementing the timestamp of the file, the new credentials (called Tokens when they are + * serialized) are broadcast to all running executors. On the executor side, when new Tokens are + * received they overwrite the current credentials. + */ +private[spark] class MesosHadoopDelegationTokenManager( +conf: SparkConf, +hadoopConfig: Configuration, +driverEndpoint: RpcEndpointRef) + extends Logging { + + require(driverEndpoint != null, "DriverEndpoint is not initialized") + + private val credentialRenewerThread: ScheduledExecutorService = +ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread") + + private val tokenManager: HadoopDelegationTokenManager = +new HadoopDelegationTokenManager(conf, hadoopConfig) + + private val principal: String = conf.get(config.PRINCIPAL).orNull + + private val (secretFile: Option[String], mode: String) = getSecretFile(conf) + + private var (tokens: Array[Byte], timeOfNextRenewal: Long) = { +try { + val creds = UserGroupInformation.getCurrentUser.getCredentials + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds) + logInfo(s"Initialized tokens: ${SparkHadoopUtil.get.dumpTokens(creds)}") + (SparkHadoopUtil.get.serialize(creds), rt) +} catch { + case e: Exception => +logError("Failed to initialize Hadoop delegation tokens\n" + + s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file $secretFile\n\tException: $e") +throw e +} + } + + scheduleTokenRenewal() + + private def getSecretFile(conf: SparkConf): (Option[String], String) = { +val keytab = conf.get(config.KEYTAB) +val tgt = Option(conf.getenv(SparkHadoopUtil.TICKET_CACHE_ENVVAR)) +val (secretFile, mode) = if (keytab.isDefined && tgt.isDefined) { + // if a keytab and a specific ticket cache is specified use the keytab and log the behavior + logWarning(s"Keytab and TGT were detected, using keytab, ${keytab.get}, " + +s"unset ${config.KEYTAB.key} to use TGT (${tgt.get})") + (keytab, "keytab") +} else { + val m = if (keytab.isDefined) "keytab" else "tgt" + val sf = if (keytab.isDefined) keytab else tgt + (sf, m) +} + +if (principal == null) { + require(mode == "tgt", s"Must specify a principal when using a Keytab, was $principal") + logInfo(s"Using ticket
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r150711597 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala --- @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +/** + * The MesosHadoopDelegationTokenManager fetches and updates Hadoop delegation tokens on the behalf + * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN AMCredentialRenewer, + * and similarly will renew the Credentials when 75% of the renewal interval has passed. + * The principal difference is that instead of writing the new credentials to HDFS and + * incrementing the timestamp of the file, the new credentials (called Tokens when they are + * serialized) are broadcast to all running executors. On the executor side, when new Tokens are + * received they overwrite the current credentials. + */ +private[spark] class MesosHadoopDelegationTokenManager( +conf: SparkConf, +hadoopConfig: Configuration, +driverEndpoint: RpcEndpointRef) + extends Logging { + + require(driverEndpoint != null, "DriverEndpoint is not initialized") + + private val credentialRenewerThread: ScheduledExecutorService = +ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread") + + private val tokenManager: HadoopDelegationTokenManager = +new HadoopDelegationTokenManager(conf, hadoopConfig) + + private val principal: String = conf.get(config.PRINCIPAL).orNull + + private val (secretFile: Option[String], mode: String) = getSecretFile(conf) + + private var (tokens: Array[Byte], timeOfNextRenewal: Long) = { +try { + val creds = UserGroupInformation.getCurrentUser.getCredentials + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds) + logInfo(s"Initialized tokens: ${SparkHadoopUtil.get.dumpTokens(creds)}") + (SparkHadoopUtil.get.serialize(creds), rt) +} catch { + case e: Exception => +logError("Failed to initialize Hadoop delegation tokens\n" + + s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file $secretFile\n\tException: $e") +throw e +} + } + + scheduleTokenRenewal() + + private def getSecretFile(conf: SparkConf): (Option[String], String) = { +val keytab = conf.get(config.KEYTAB) +val tgt = Option(conf.getenv(SparkHadoopUtil.TICKET_CACHE_ENVVAR)) +val (secretFile, mode) = if (keytab.isDefined && tgt.isDefined) { + // if a keytab and a specific ticket cache is specified use the keytab and log the behavior + logWarning(s"Keytab and TGT were detected, using keytab, ${keytab.get}, " + +s"unset ${config.KEYTAB.key} to use TGT (${tgt.get})") + (keytab, "keytab") +} else { + val m = if (keytab.isDefined) "keytab" else "tgt" + val sf = if (keytab.isDefined) keytab else tgt + (sf, m) +} + +if (principal == null) { + require(mode == "tgt", s"Must specify a principal when using a Keytab, was $principal") + logInfo(s"Using ticket
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r150711225 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala --- @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +/** + * The MesosHadoopDelegationTokenManager fetches and updates Hadoop delegation tokens on the behalf + * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN AMCredentialRenewer, + * and similarly will renew the Credentials when 75% of the renewal interval has passed. + * The principal difference is that instead of writing the new credentials to HDFS and + * incrementing the timestamp of the file, the new credentials (called Tokens when they are + * serialized) are broadcast to all running executors. On the executor side, when new Tokens are + * received they overwrite the current credentials. + */ +private[spark] class MesosHadoopDelegationTokenManager( +conf: SparkConf, +hadoopConfig: Configuration, +driverEndpoint: RpcEndpointRef) + extends Logging { + + require(driverEndpoint != null, "DriverEndpoint is not initialized") + + private val credentialRenewerThread: ScheduledExecutorService = +ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread") + + private val tokenManager: HadoopDelegationTokenManager = +new HadoopDelegationTokenManager(conf, hadoopConfig) + + private val principal: String = conf.get(config.PRINCIPAL).orNull + + private val (secretFile: Option[String], mode: String) = getSecretFile(conf) + + private var (tokens: Array[Byte], timeOfNextRenewal: Long) = { +try { + val creds = UserGroupInformation.getCurrentUser.getCredentials + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds) + logInfo(s"Initialized tokens: ${SparkHadoopUtil.get.dumpTokens(creds)}") + (SparkHadoopUtil.get.serialize(creds), rt) +} catch { + case e: Exception => +logError("Failed to initialize Hadoop delegation tokens\n" + + s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file $secretFile\n\tException: $e") +throw e +} + } + + scheduleTokenRenewal() + + private def getSecretFile(conf: SparkConf): (Option[String], String) = { +val keytab = conf.get(config.KEYTAB) +val tgt = Option(conf.getenv(SparkHadoopUtil.TICKET_CACHE_ENVVAR)) +val (secretFile, mode) = if (keytab.isDefined && tgt.isDefined) { + // if a keytab and a specific ticket cache is specified use the keytab and log the behavior + logWarning(s"Keytab and TGT were detected, using keytab, ${keytab.get}, " + +s"unset ${config.KEYTAB.key} to use TGT (${tgt.get})") + (keytab, "keytab") +} else { + val m = if (keytab.isDefined) "keytab" else "tgt" + val sf = if (keytab.isDefined) keytab else tgt + (sf, m) +} + +if (principal == null) { + require(mode == "tgt", s"Must specify a principal when using a Keytab, was $principal") + logInfo(s"Using ticket
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r150688646 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala --- @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +/** + * The MesosHadoopDelegationTokenManager fetches and updates Hadoop delegation tokens on the behalf + * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN AMCredentialRenewer, + * and similarly will renew the Credentials when 75% of the renewal interval has passed. + * The principal difference is that instead of writing the new credentials to HDFS and + * incrementing the timestamp of the file, the new credentials (called Tokens when they are + * serialized) are broadcast to all running executors. On the executor side, when new Tokens are + * received they overwrite the current credentials. + */ +private[spark] class MesosHadoopDelegationTokenManager( +conf: SparkConf, +hadoopConfig: Configuration, +driverEndpoint: RpcEndpointRef) + extends Logging { + + require(driverEndpoint != null, "DriverEndpoint is not initialized") + + private val credentialRenewerThread: ScheduledExecutorService = +ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread") + + private val tokenManager: HadoopDelegationTokenManager = +new HadoopDelegationTokenManager(conf, hadoopConfig) + + private val principal: String = conf.get(config.PRINCIPAL).orNull + + private val (secretFile: Option[String], mode: String) = getSecretFile(conf) + + private var (tokens: Array[Byte], timeOfNextRenewal: Long) = { +try { + val creds = UserGroupInformation.getCurrentUser.getCredentials + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds) + logInfo(s"Initialized tokens: ${SparkHadoopUtil.get.dumpTokens(creds)}") + (SparkHadoopUtil.get.serialize(creds), rt) +} catch { + case e: Exception => +logError("Failed to initialize Hadoop delegation tokens\n" + + s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file $secretFile\n\tException: $e") +throw e +} + } + + scheduleTokenRenewal() + + private def getSecretFile(conf: SparkConf): (Option[String], String) = { +val keytab = conf.get(config.KEYTAB) +val tgt = Option(conf.getenv(SparkHadoopUtil.TICKET_CACHE_ENVVAR)) +val (secretFile, mode) = if (keytab.isDefined && tgt.isDefined) { + // if a keytab and a specific ticket cache is specified use the keytab and log the behavior + logWarning(s"Keytab and TGT were detected, using keytab, ${keytab.get}, " + +s"unset ${config.KEYTAB.key} to use TGT (${tgt.get})") + (keytab, "keytab") +} else { + val m = if (keytab.isDefined) "keytab" else "tgt" + val sf = if (keytab.isDefined) keytab else tgt + (sf, m) +} + +if (principal == null) { + require(mode == "tgt", s"Must specify a principal when using a Keytab, was $principal") + logInfo(s"Using ticket
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r150053337 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +/** + * The MesosHadoopDelegationTokenManager fetches and updates Hadoop delegation tokens on the behalf + * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN AMCredentialRenewer, + * and similarly will renew the Credentials when 75% of the renewal interval has passed. + * The principal difference is that instead of writing the new credentials to HDFS and + * incrementing the timestamp of the file, the new credentials (called Tokens when they are + * serialized) are broadcast to all running executors. On the executor side, when new Tokens are + * received they overwrite the current credentials. + */ +private[spark] class MesosHadoopDelegationTokenManager( +conf: SparkConf, hadoopConfig: Configuration, +driverEndpoint: RpcEndpointRef) + extends Logging { + + private val credentialRenewerThread: ScheduledExecutorService = +ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread") + + private val tokenManager: HadoopDelegationTokenManager = +new HadoopDelegationTokenManager(conf, hadoopConfig) + + private val principal: String = conf.get(config.PRINCIPAL).orNull + + private val (secretFile: String, mode: String) = getSecretFile(conf) + + private var (tokens: Array[Byte], timeOfNextRenewal: Long) = { +try { + val creds = UserGroupInformation.getCurrentUser.getCredentials + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds) + logInfo(s"Initialized tokens: ${SparkHadoopUtil.get.dumpTokens(creds)}") + (SparkHadoopUtil.get.serialize(creds), rt) +} catch { + case e: Exception => +throw new IllegalStateException("Failed to initialize Hadoop delegation tokens\n" + + s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file $secretFile\n\tException: $e") +} + } + + scheduleTokenRenewal() + + private def getSecretFile(conf: SparkConf): (String, String) = { +val keytab = conf.get(config.KEYTAB).orNull +val tgt = conf.getenv("KRB5CCNAME") +require(keytab != null || tgt != null, "A keytab or TGT required.") --- End diff -- Is that really the case? `KRB5CCNAME` is not a required env variable. It has a default value, and the `UGI` class will use the credentials from the default location if they're available (and reloading the cache periodically). So I think you don't really need this, but just to track whether there's a principal and keytab. And you don't need to call `getUGIFromTicketCache` later on since I'm pretty sure UGI takes care of that for you. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r150054048 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +/** + * The MesosHadoopDelegationTokenManager fetches and updates Hadoop delegation tokens on the behalf + * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN AMCredentialRenewer, + * and similarly will renew the Credentials when 75% of the renewal interval has passed. + * The principal difference is that instead of writing the new credentials to HDFS and + * incrementing the timestamp of the file, the new credentials (called Tokens when they are + * serialized) are broadcast to all running executors. On the executor side, when new Tokens are + * received they overwrite the current credentials. + */ +private[spark] class MesosHadoopDelegationTokenManager( +conf: SparkConf, hadoopConfig: Configuration, +driverEndpoint: RpcEndpointRef) + extends Logging { + + private val credentialRenewerThread: ScheduledExecutorService = +ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread") + + private val tokenManager: HadoopDelegationTokenManager = +new HadoopDelegationTokenManager(conf, hadoopConfig) + + private val principal: String = conf.get(config.PRINCIPAL).orNull + + private val (secretFile: String, mode: String) = getSecretFile(conf) + + private var (tokens: Array[Byte], timeOfNextRenewal: Long) = { +try { + val creds = UserGroupInformation.getCurrentUser.getCredentials + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds) + logInfo(s"Initialized tokens: ${SparkHadoopUtil.get.dumpTokens(creds)}") + (SparkHadoopUtil.get.serialize(creds), rt) +} catch { + case e: Exception => +throw new IllegalStateException("Failed to initialize Hadoop delegation tokens\n" + + s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file $secretFile\n\tException: $e") +} + } + + scheduleTokenRenewal() + + private def getSecretFile(conf: SparkConf): (String, String) = { +val keytab = conf.get(config.KEYTAB).orNull +val tgt = conf.getenv("KRB5CCNAME") +require(keytab != null || tgt != null, "A keytab or TGT required.") +// if both Keytab and TGT are detected we use the Keytab. +val (secretFile, mode) = if (keytab != null && tgt != null) { + logWarning(s"Keytab and TGT were detected, using keytab, " + +s"unset ${config.KEYTAB.key} to use TGT") + (keytab, "keytab") +} else { + val m = if (keytab != null) "keytab" else "tgt" + val sf = if (keytab != null) keytab else tgt + (sf, m) +} + +if (principal == null) { + logInfo(s"Using mode: $mode to retrieve Hadoop delegation tokens") +} else { + logInfo(s"Using principal: $principal with mode: $mode to retrieve Hadoop delegation tokens") +} + +logDebug(s"secretFile is
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r150049113 --- Diff: core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala --- @@ -123,6 +123,9 @@ private[spark] class CoarseGrainedExecutorBackend( executor.stop() } }.start() + +case UpdateDelegationTokens(tokenBytes) => + SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf) --- End diff -- Can you add a `logInfo` saying the tokens are being updated? This has always been helpful when debugging issues with this feature on YARN. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r150051400 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +/** + * The MesosHadoopDelegationTokenManager fetches and updates Hadoop delegation tokens on the behalf + * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN AMCredentialRenewer, + * and similarly will renew the Credentials when 75% of the renewal interval has passed. + * The principal difference is that instead of writing the new credentials to HDFS and + * incrementing the timestamp of the file, the new credentials (called Tokens when they are + * serialized) are broadcast to all running executors. On the executor side, when new Tokens are + * received they overwrite the current credentials. + */ +private[spark] class MesosHadoopDelegationTokenManager( +conf: SparkConf, hadoopConfig: Configuration, +driverEndpoint: RpcEndpointRef) + extends Logging { + + private val credentialRenewerThread: ScheduledExecutorService = +ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread") + + private val tokenManager: HadoopDelegationTokenManager = +new HadoopDelegationTokenManager(conf, hadoopConfig) + + private val principal: String = conf.get(config.PRINCIPAL).orNull + + private val (secretFile: String, mode: String) = getSecretFile(conf) + + private var (tokens: Array[Byte], timeOfNextRenewal: Long) = { +try { + val creds = UserGroupInformation.getCurrentUser.getCredentials + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds) + logInfo(s"Initialized tokens: ${SparkHadoopUtil.get.dumpTokens(creds)}") + (SparkHadoopUtil.get.serialize(creds), rt) +} catch { + case e: Exception => +throw new IllegalStateException("Failed to initialize Hadoop delegation tokens\n" + + s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file $secretFile\n\tException: $e") +} + } + + scheduleTokenRenewal() + + private def getSecretFile(conf: SparkConf): (String, String) = { +val keytab = conf.get(config.KEYTAB).orNull +val tgt = conf.getenv("KRB5CCNAME") +require(keytab != null || tgt != null, "A keytab or TGT required.") +// if both Keytab and TGT are detected we use the Keytab. +val (secretFile, mode) = if (keytab != null && tgt != null) { + logWarning(s"Keytab and TGT were detected, using keytab, " + +s"unset ${config.KEYTAB.key} to use TGT") + (keytab, "keytab") +} else { + val m = if (keytab != null) "keytab" else "tgt" + val sf = if (keytab != null) keytab else tgt + (sf, m) +} + +if (principal == null) { + logInfo(s"Using mode: $mode to retrieve Hadoop delegation tokens") +} else { + logInfo(s"Using principal: $principal with mode: $mode to retrieve Hadoop delegation tokens") +} + +logDebug(s"secretFile is
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r150052163 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +/** + * The MesosHadoopDelegationTokenManager fetches and updates Hadoop delegation tokens on the behalf + * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN AMCredentialRenewer, + * and similarly will renew the Credentials when 75% of the renewal interval has passed. + * The principal difference is that instead of writing the new credentials to HDFS and + * incrementing the timestamp of the file, the new credentials (called Tokens when they are + * serialized) are broadcast to all running executors. On the executor side, when new Tokens are + * received they overwrite the current credentials. + */ +private[spark] class MesosHadoopDelegationTokenManager( +conf: SparkConf, hadoopConfig: Configuration, +driverEndpoint: RpcEndpointRef) + extends Logging { + + private val credentialRenewerThread: ScheduledExecutorService = +ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread") + + private val tokenManager: HadoopDelegationTokenManager = +new HadoopDelegationTokenManager(conf, hadoopConfig) + + private val principal: String = conf.get(config.PRINCIPAL).orNull + + private val (secretFile: String, mode: String) = getSecretFile(conf) + + private var (tokens: Array[Byte], timeOfNextRenewal: Long) = { +try { + val creds = UserGroupInformation.getCurrentUser.getCredentials + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds) + logInfo(s"Initialized tokens: ${SparkHadoopUtil.get.dumpTokens(creds)}") + (SparkHadoopUtil.get.serialize(creds), rt) +} catch { + case e: Exception => +throw new IllegalStateException("Failed to initialize Hadoop delegation tokens\n" + + s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file $secretFile\n\tException: $e") --- End diff -- Use `e` as the cause of the exception you're throwing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r150053751 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +/** + * The MesosHadoopDelegationTokenManager fetches and updates Hadoop delegation tokens on the behalf + * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN AMCredentialRenewer, + * and similarly will renew the Credentials when 75% of the renewal interval has passed. + * The principal difference is that instead of writing the new credentials to HDFS and + * incrementing the timestamp of the file, the new credentials (called Tokens when they are + * serialized) are broadcast to all running executors. On the executor side, when new Tokens are + * received they overwrite the current credentials. + */ +private[spark] class MesosHadoopDelegationTokenManager( +conf: SparkConf, hadoopConfig: Configuration, +driverEndpoint: RpcEndpointRef) + extends Logging { + + private val credentialRenewerThread: ScheduledExecutorService = +ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread") + + private val tokenManager: HadoopDelegationTokenManager = +new HadoopDelegationTokenManager(conf, hadoopConfig) + + private val principal: String = conf.get(config.PRINCIPAL).orNull + + private val (secretFile: String, mode: String) = getSecretFile(conf) + + private var (tokens: Array[Byte], timeOfNextRenewal: Long) = { +try { + val creds = UserGroupInformation.getCurrentUser.getCredentials + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds) + logInfo(s"Initialized tokens: ${SparkHadoopUtil.get.dumpTokens(creds)}") + (SparkHadoopUtil.get.serialize(creds), rt) +} catch { + case e: Exception => +throw new IllegalStateException("Failed to initialize Hadoop delegation tokens\n" + + s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file $secretFile\n\tException: $e") +} + } + + scheduleTokenRenewal() + + private def getSecretFile(conf: SparkConf): (String, String) = { +val keytab = conf.get(config.KEYTAB).orNull +val tgt = conf.getenv("KRB5CCNAME") +require(keytab != null || tgt != null, "A keytab or TGT required.") +// if both Keytab and TGT are detected we use the Keytab. +val (secretFile, mode) = if (keytab != null && tgt != null) { + logWarning(s"Keytab and TGT were detected, using keytab, " + +s"unset ${config.KEYTAB.key} to use TGT") + (keytab, "keytab") +} else { + val m = if (keytab != null) "keytab" else "tgt" + val sf = if (keytab != null) keytab else tgt + (sf, m) +} + +if (principal == null) { + logInfo(s"Using mode: $mode to retrieve Hadoop delegation tokens") +} else { + logInfo(s"Using principal: $principal with mode: $mode to retrieve Hadoop delegation tokens") +} + +logDebug(s"secretFile is
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r150054305 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +/** + * The MesosHadoopDelegationTokenManager fetches and updates Hadoop delegation tokens on the behalf + * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN AMCredentialRenewer, + * and similarly will renew the Credentials when 75% of the renewal interval has passed. + * The principal difference is that instead of writing the new credentials to HDFS and + * incrementing the timestamp of the file, the new credentials (called Tokens when they are + * serialized) are broadcast to all running executors. On the executor side, when new Tokens are + * received they overwrite the current credentials. + */ +private[spark] class MesosHadoopDelegationTokenManager( +conf: SparkConf, hadoopConfig: Configuration, +driverEndpoint: RpcEndpointRef) + extends Logging { + + private val credentialRenewerThread: ScheduledExecutorService = +ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread") + + private val tokenManager: HadoopDelegationTokenManager = +new HadoopDelegationTokenManager(conf, hadoopConfig) + + private val principal: String = conf.get(config.PRINCIPAL).orNull + + private val (secretFile: String, mode: String) = getSecretFile(conf) + + private var (tokens: Array[Byte], timeOfNextRenewal: Long) = { +try { + val creds = UserGroupInformation.getCurrentUser.getCredentials + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds) + logInfo(s"Initialized tokens: ${SparkHadoopUtil.get.dumpTokens(creds)}") + (SparkHadoopUtil.get.serialize(creds), rt) +} catch { + case e: Exception => +throw new IllegalStateException("Failed to initialize Hadoop delegation tokens\n" + + s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file $secretFile\n\tException: $e") +} + } + + scheduleTokenRenewal() + + private def getSecretFile(conf: SparkConf): (String, String) = { +val keytab = conf.get(config.KEYTAB).orNull +val tgt = conf.getenv("KRB5CCNAME") +require(keytab != null || tgt != null, "A keytab or TGT required.") +// if both Keytab and TGT are detected we use the Keytab. +val (secretFile, mode) = if (keytab != null && tgt != null) { + logWarning(s"Keytab and TGT were detected, using keytab, " + +s"unset ${config.KEYTAB.key} to use TGT") + (keytab, "keytab") +} else { + val m = if (keytab != null) "keytab" else "tgt" + val sf = if (keytab != null) keytab else tgt + (sf, m) +} + +if (principal == null) { + logInfo(s"Using mode: $mode to retrieve Hadoop delegation tokens") +} else { + logInfo(s"Using principal: $principal with mode: $mode to retrieve Hadoop delegation tokens") +} + +logDebug(s"secretFile is
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r150051073 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +/** + * The MesosHadoopDelegationTokenManager fetches and updates Hadoop delegation tokens on the behalf + * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN AMCredentialRenewer, + * and similarly will renew the Credentials when 75% of the renewal interval has passed. + * The principal difference is that instead of writing the new credentials to HDFS and + * incrementing the timestamp of the file, the new credentials (called Tokens when they are + * serialized) are broadcast to all running executors. On the executor side, when new Tokens are + * received they overwrite the current credentials. + */ +private[spark] class MesosHadoopDelegationTokenManager( +conf: SparkConf, hadoopConfig: Configuration, +driverEndpoint: RpcEndpointRef) + extends Logging { + + private val credentialRenewerThread: ScheduledExecutorService = +ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread") + + private val tokenManager: HadoopDelegationTokenManager = +new HadoopDelegationTokenManager(conf, hadoopConfig) + + private val principal: String = conf.get(config.PRINCIPAL).orNull + + private val (secretFile: String, mode: String) = getSecretFile(conf) + + private var (tokens: Array[Byte], timeOfNextRenewal: Long) = { +try { + val creds = UserGroupInformation.getCurrentUser.getCredentials + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds) + logInfo(s"Initialized tokens: ${SparkHadoopUtil.get.dumpTokens(creds)}") + (SparkHadoopUtil.get.serialize(creds), rt) +} catch { + case e: Exception => +throw new IllegalStateException("Failed to initialize Hadoop delegation tokens\n" + + s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file $secretFile\n\tException: $e") +} + } + + scheduleTokenRenewal() + + private def getSecretFile(conf: SparkConf): (String, String) = { +val keytab = conf.get(config.KEYTAB).orNull +val tgt = conf.getenv("KRB5CCNAME") +require(keytab != null || tgt != null, "A keytab or TGT required.") +// if both Keytab and TGT are detected we use the Keytab. +val (secretFile, mode) = if (keytab != null && tgt != null) { + logWarning(s"Keytab and TGT were detected, using keytab, " + +s"unset ${config.KEYTAB.key} to use TGT") + (keytab, "keytab") +} else { + val m = if (keytab != null) "keytab" else "tgt" + val sf = if (keytab != null) keytab else tgt + (sf, m) +} + +if (principal == null) { + logInfo(s"Using mode: $mode to retrieve Hadoop delegation tokens") --- End diff -- You should probably assert that mode is "tgt" in this case. --- - To unsubscribe,
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r150049758 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -236,7 +234,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp val reply = SparkAppConfig( sparkProperties, SparkEnv.get.securityManager.getIOEncryptionKey(), - hadoopDelegationCreds) + hadoopDelegationTokens.apply()) --- End diff -- Can't you just call `fetchHadoopDelegationTokens()` directly? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r150050571 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +/** + * The MesosHadoopDelegationTokenManager fetches and updates Hadoop delegation tokens on the behalf + * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN AMCredentialRenewer, + * and similarly will renew the Credentials when 75% of the renewal interval has passed. + * The principal difference is that instead of writing the new credentials to HDFS and + * incrementing the timestamp of the file, the new credentials (called Tokens when they are + * serialized) are broadcast to all running executors. On the executor side, when new Tokens are + * received they overwrite the current credentials. + */ +private[spark] class MesosHadoopDelegationTokenManager( +conf: SparkConf, hadoopConfig: Configuration, --- End diff -- One arg per line. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r150049429 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -159,6 +151,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp scheduler.getExecutorsAliveOnHost(host).foreach { exec => killExecutors(exec.toSeq, replace = true, force = true) } + + case UpdateDelegationTokens(newDelegationTokens) => +// Update the driver's delegation tokens in case new executors are added later. --- End diff -- Stale comment? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r149797247 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -213,6 +216,14 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix) ) +// check that the credentials are defined, even though it's likely that auth would have failed +// already if you've made it this far, then start the token renewer +if (hadoopDelegationTokens.isDefined) { --- End diff -- Check out the patch now. `hadoopDelegationTokens` now calls `initializeHadoopDelegationTokens` (renamed `fetchHadoopDelegationTokens`) by name: ```scala private val hadoopDelegationTokens: () => Option[Array[Byte]] = fetchHadoopDelegationTokens ``` This has the effect of only generating the first set of delegation tokens once the first `RetrieveSparkAppConfig` message is received. At this point, everything has been initialized because renewer (renamed `MesosHadoopDelegationTokenManager`) is evaluated lazily with the correct `driverEndpoint`. It's a bit confusing to just avoid an extra conditional. WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r149775044 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -213,6 +216,14 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix) ) +// check that the credentials are defined, even though it's likely that auth would have failed +// already if you've made it this far, then start the token renewer +if (hadoopDelegationTokens.isDefined) { --- End diff -- You could call `initializeHadoopDelegationTokens` in `start` after everything that's needed is initialized. It would also better follow the scheduler's lifecycle. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r149564294 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -213,6 +216,14 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix) ) +// check that the credentials are defined, even though it's likely that auth would have failed +// already if you've made it this far, then start the token renewer +if (hadoopDelegationTokens.isDefined) { --- End diff -- I may have spoke too soon, there might be a way.. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r149549953 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -213,6 +216,14 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix) ) +// check that the credentials are defined, even though it's likely that auth would have failed +// already if you've made it this far, then start the token renewer +if (hadoopDelegationTokens.isDefined) { --- End diff -- I agree that I shouldn't need to use the conditional `hadoopDelegationTokens.isDefined`, however there will need to be some check (`UserGroupInformation.isSecurityEnabled` or similar) to pass the `driverEndpoint` to the renewer/manager here. When the initial tokens are generated `driverEndpoint` is still `None` because `start()` hasn't been called yet. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r149235352 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -99,11 +96,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // The num of current max ExecutorId used to re-register appMaster @volatile protected var currentExecutorIdCounter = 0 - // hadoop token manager used by some sub-classes (e.g. Mesos) - def hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = None - - // Hadoop delegation tokens to be sent to the executors. - val hadoopDelegationCreds: Option[Array[Byte]] = getHadoopDelegationCreds() + // Hadoop delegation tokens to be sent to the executors, can be updated as necessary. + protected var hadoopDelegationTokens: Option[Array[Byte]] = initializeHadoopDelegationTokens() --- End diff -- Why is this protected? There's no reason I can see for subclasses to need access to this field. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r149237172 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -772,6 +783,14 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( offer.getHostname } } + + override def initializeHadoopDelegationTokens(): Option[Array[Byte]] = { +if (UserGroupInformation.isSecurityEnabled) { + Some(hadoopCredentialRenewer.tokens) --- End diff -- So, seems to me that your "renewer" is doing more than just renewing tokens; it's also being used to generate the initial set. So aside from my comments about initializing the renewer here, you should also probably make this API a little cleaner. Right now there's too much coupling. The renewer should to renewals only, otherwise it should be called something different. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r149236687 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -58,8 +60,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) with org.apache.mesos.Scheduler with MesosSchedulerUtils { - override def hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = -Some(new HadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration)) + private lazy val hadoopCredentialRenewer: MesosCredentialRenewer = +new MesosCredentialRenewer( + conf, new HadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration)) --- End diff -- Why pass in a `HadoopDelegationTokenManager` if it's not used by this class? The renewer can create one itself. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r149236410 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} + +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +/** + * The MesosCredentialRenewer will update the Hadoop credentials for Spark drivers accessing + * secured services using Kerberos authentication. It is modeled after the YARN AMCredential + * renewer, and similarly will renew the Credentials when 75% of the renewal interval has passed. + * The principal difference is that instead of writing the new credentials to HDFS and + * incrementing the timestamp of the file, the new credentials (called Tokens when they are + * serialized) are broadcast to all running executors. On the executor side, when new Tokens are + * recieved they overwrite the current credentials. + */ +class MesosCredentialRenewer( +conf: SparkConf, +tokenManager: HadoopDelegationTokenManager) extends Logging { + + private val credentialRenewerThread: ScheduledExecutorService = +ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread") + + private val principal = conf.get(config.PRINCIPAL).orNull + + private val (secretFile, mode) = getSecretFile(conf) + + var (tokens: Array[Byte], timeOfNextRenewal: Long) = { +try { + val creds = UserGroupInformation.getCurrentUser.getCredentials + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds) + (SparkHadoopUtil.get.serialize(creds), rt) +} catch { + case e: Exception => +throw new IllegalStateException("Failed to initialize Hadoop delegation tokens\n" + + s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file $secretFile\n\tException: $e") +} + + } + + private def getSecretFile(conf: SparkConf): (String, String) = { +val keytab = conf.get(config.KEYTAB).orNull +val tgt = conf.getenv("KRB5CCNAME") +require(keytab != null || tgt != null, "A keytab or TGT required.") +// if both Keytab and TGT are detected we use the Keytab. +val (secretFile, mode) = if (keytab != null && tgt != null) { + logWarning(s"Keytab and TGT were detected, using keytab, " + +s"unset ${config.KEYTAB.key} to use TGT") + (keytab, "keytab") +} else { + val m = if (keytab != null) "keytab" else "tgt" + val sf = if (keytab != null) keytab else tgt + (sf, m) +} + +if (principal == null) { + logInfo(s"Using mode: $mode to retrieve Hadoop delegation tokens") +} else { + logInfo(s"Using principal: $principal with mode: $mode to retrieve Hadoop delegation tokens") +} + +logDebug(s"secretFile is $secretFile") +(secretFile, mode) + } + + def scheduleTokenRenewal(driverEndpoint: RpcEndpointRef): Unit = { --- End diff -- Why isn't this done in the constructor? There's a single call to this method, and the renewal interval could very easily be turned into a constructor arg. --- - To
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r149235095 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala --- @@ -55,7 +55,9 @@ private[spark] class HadoopDelegationTokenManager( logDebug(s"Using the following delegation token providers: " + s"${delegationTokenProviders.keys.mkString(", ")}.") - /** Construct a [[HadoopDelegationTokenManager]] for the default Hadoop filesystem */ + /** --- End diff -- This is not really changing anything, so I'd just revert changes to this file. Or, if you really want to, just keep the new `@param`s you're adding below. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r149235641 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -213,6 +216,14 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix) ) +// check that the credentials are defined, even though it's likely that auth would have failed +// already if you've made it this far, then start the token renewer +if (hadoopDelegationTokens.isDefined) { --- End diff -- You shouldn't do this here, otherwise you need to keep that field `protected` in the parent class and that adds unnecessary coupling. Instead, do this in `initializeHadoopDelegationTokens`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r149235832 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} + +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +/** + * The MesosCredentialRenewer will update the Hadoop credentials for Spark drivers accessing + * secured services using Kerberos authentication. It is modeled after the YARN AMCredential + * renewer, and similarly will renew the Credentials when 75% of the renewal interval has passed. + * The principal difference is that instead of writing the new credentials to HDFS and + * incrementing the timestamp of the file, the new credentials (called Tokens when they are + * serialized) are broadcast to all running executors. On the executor side, when new Tokens are + * recieved they overwrite the current credentials. + */ +class MesosCredentialRenewer( --- End diff -- `private[spark]` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r149235915 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} + +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +/** + * The MesosCredentialRenewer will update the Hadoop credentials for Spark drivers accessing + * secured services using Kerberos authentication. It is modeled after the YARN AMCredential + * renewer, and similarly will renew the Credentials when 75% of the renewal interval has passed. + * The principal difference is that instead of writing the new credentials to HDFS and + * incrementing the timestamp of the file, the new credentials (called Tokens when they are + * serialized) are broadcast to all running executors. On the executor side, when new Tokens are + * recieved they overwrite the current credentials. + */ +class MesosCredentialRenewer( +conf: SparkConf, +tokenManager: HadoopDelegationTokenManager) extends Logging { + + private val credentialRenewerThread: ScheduledExecutorService = +ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread") + + private val principal = conf.get(config.PRINCIPAL).orNull + + private val (secretFile, mode) = getSecretFile(conf) + + var (tokens: Array[Byte], timeOfNextRenewal: Long) = { --- End diff -- `private`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r148982457 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -58,8 +62,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) with org.apache.mesos.Scheduler with MesosSchedulerUtils { - override def hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = -Some(new HadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration)) + private lazy val hadoopCredentialRenewer: MesosCredentialRenewer = --- End diff -- For Mesos the credential renewer contains the tokens, the renewal time, and all logic to renew the tokens. Should never be evaluated (and tokens never initialized) if `UserGroupInformation.isSecurityEnabled` evaluates to `false` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r148982255 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -686,18 +687,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp true } - protected def getHadoopDelegationCreds(): Option[Array[Byte]] = { --- End diff -- This method was only called once, and would discard the renewal time information limiting it's utility. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r148982165 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -99,11 +96,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // The num of current max ExecutorId used to re-register appMaster @volatile protected var currentExecutorIdCounter = 0 - // hadoop token manager used by some sub-classes (e.g. Mesos) - def hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = None --- End diff -- No longer needed because resource-manager backends (may) implement their own `initializeHadoopDelegationTokens`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r148341472 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -102,8 +99,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // hadoop token manager used by some sub-classes (e.g. Mesos) def hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = None - // Hadoop delegation tokens to be sent to the executors. - val hadoopDelegationCreds: Option[Array[Byte]] = getHadoopDelegationCreds() + def hadoopCredentialRenewer: Option[HadoopCredentialRenewer] = None --- End diff -- Why is this needed here? This class doesn't use it at all. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r148337519 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopCredentialRenewer.scala --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} + +import org.apache.spark.internal.Logging + +/** + * Abstract class for credential renewers used my YARN and Mesos. To renew delegation tokens + * the scheduler backend calls [[scheduleTokenRenewal]]. The implementation of this method + * (and the dispersal of tokens) is resource-manager-specific, see implementations of this + * class for details. + */ +abstract class HadoopCredentialRenewer extends Logging { --- End diff -- `private[spark]` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r148339089 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala --- @@ -75,6 +78,17 @@ private[spark] class HadoopDelegationTokenManager( .toMap } + def getRenewableDelegationTokens(): Option[RenewableDelegationTokens] = { --- End diff -- I'm not too fond of the idea of having two methods for creating tokens (this and `obtainDelegationTokens`). One should be enough. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r148342319 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -102,8 +99,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // hadoop token manager used by some sub-classes (e.g. Mesos) def hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = None - // Hadoop delegation tokens to be sent to the executors. - val hadoopDelegationCreds: Option[Array[Byte]] = getHadoopDelegationCreds() + def hadoopCredentialRenewer: Option[HadoopCredentialRenewer] = None + + // Hadoop delegation tokens to be sent to the executors, can be updated as necessary. + var renewableDelegationTokens: Option[RenewableDelegationTokens] = --- End diff -- This is another part where the interface is brittle and a bit confusing. This should really be a private field, because there should be a single way for sub-classes to update it (send a `UpdateDelegationTokens` message). But it's not because Mesos needs to poke at it because the rest of the API is all a bit weird. If instead you follow my original suggestion (have a abstract `initializeDelegationTokens()` method that Mesos overrides), then everything becomes a lot more localized and cleaner. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r148338530 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala --- @@ -125,3 +141,5 @@ private[spark] class HadoopDelegationTokenManager( }.foldLeft(Long.MaxValue)(math.min) } } + +case class RenewableDelegationTokens(credentials: Array[Byte], nextRenewalTime: Long) --- End diff -- This is not a great name from this class; it doesn't provide any functionality related to renewing the tokens, so it's not really clear why it's called "renewable". If this actually wrapped the `HadoopDelegationTokenManager` and provided a `renew` method or something, that would be better. But at the same time this is starting to make this API too confusing. So two options here: either use the existing API and make the caller hold on to the two pieces of info (tokens and renewal time), or change the existing API. But don't add a new API that basically does the same thing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r148337992 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala --- @@ -55,7 +56,9 @@ private[spark] class HadoopDelegationTokenManager( logDebug(s"Using the following delegation token providers: " + s"${delegationTokenProviders.keys.mkString(", ")}.") - /** Construct a [[HadoopDelegationTokenManager]] for the default Hadoop filesystem */ + /** Construct a [[HadoopDelegationTokenManager]] for the default Hadoop filesystem with a --- End diff -- Comment starts on next line. But the comment doesn't really seem accurate anyway. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r148013037 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -233,10 +249,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp context.reply(true) case RetrieveSparkAppConfig => +val tokens = if (renewableDelegationTokens.isDefined) { + Some(renewableDelegationTokens.get.credentials) +} else { + None +} val reply = SparkAppConfig( sparkProperties, SparkEnv.get.securityManager.getIOEncryptionKey(), - hadoopDelegationCreds) + tokens) --- End diff -- could make `SparkAppConfig` just take a `RenewableDelegationTokens` object? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r147866441 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala --- @@ -17,7 +17,7 @@ package org.apache.spark.scheduler.cluster.mesos -import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.SparkContext --- End diff -- `SparkException` is unused, not sure why it was there in the first place --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r146946137 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -213,6 +216,24 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix) ) +// check that the credentials are defined, even though it's likely that auth would have failed +// already if you've made it this far +if (principal != null && currentHadoopDelegationTokens.isDefined) { + logDebug(s"Principal found ($principal) starting token renewer") + // The renewal time is ignored when getting the initial delegation tokens + // (CoarseGrainedSchedulerBackend.scala:getHadoopDelegationCreds), so we get the renewal + // time here and schedule a thread to renew them. + val renewalTime = --- End diff -- But do note that you still can't start the token renewer automatically, since YARN still has its own renewer and things would get mixed up. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r146943847 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -213,6 +216,24 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix) ) +// check that the credentials are defined, even though it's likely that auth would have failed +// already if you've made it this far +if (principal != null && currentHadoopDelegationTokens.isDefined) { + logDebug(s"Principal found ($principal) starting token renewer") + // The renewal time is ignored when getting the initial delegation tokens + // (CoarseGrainedSchedulerBackend.scala:getHadoopDelegationCreds), so we get the renewal + // time here and schedule a thread to renew them. + val renewalTime = --- End diff -- I guess that would be ok. I was a little worried about mixing the two but they are kinda closely related... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r146855311 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -213,6 +216,24 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix) ) +// check that the credentials are defined, even though it's likely that auth would have failed +// already if you've made it this far +if (principal != null && currentHadoopDelegationTokens.isDefined) { + logDebug(s"Principal found ($principal) starting token renewer") + // The renewal time is ignored when getting the initial delegation tokens + // (CoarseGrainedSchedulerBackend.scala:getHadoopDelegationCreds), so we get the renewal + // time here and schedule a thread to renew them. + val renewalTime = --- End diff -- Maybe an overly simplistic solution, but what about putting this functionality into `HadoopDelegationTokenManager`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r146622470 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -213,6 +216,24 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix) ) +// check that the credentials are defined, even though it's likely that auth would have failed +// already if you've made it this far +if (principal != null && currentHadoopDelegationTokens.isDefined) { + logDebug(s"Principal found ($principal) starting token renewer") + // The renewal time is ignored when getting the initial delegation tokens + // (CoarseGrainedSchedulerBackend.scala:getHadoopDelegationCreds), so we get the renewal + // time here and schedule a thread to renew them. + val renewalTime = --- End diff -- It makes more sense to have it in `CoarseGrainedSchedulerBackend` (then YARN and Mesos can easily share it); my worry is exposing the functionality to standalone (it would work, but not be secure, which is a problem). So maybe add a second method `def supportsTokenRenewal` that subclasses can override to return `true` or something. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r146622564 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala --- @@ -59,8 +59,7 @@ private[yarn] class AMCredentialRenewer( private var lastCredentialsFileSuffix = 0 private val credentialRenewer = -Executors.newSingleThreadScheduledExecutor( - ThreadUtils.namedThreadFactory("Credential Refresh Thread")) +ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh Thread") --- End diff -- It's ok since you've done it, I was mostly making a generic comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r146473582 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala --- @@ -59,8 +59,7 @@ private[yarn] class AMCredentialRenewer( private var lastCredentialsFileSuffix = 0 private val credentialRenewer = -Executors.newSingleThreadScheduledExecutor( - ThreadUtils.namedThreadFactory("Credential Refresh Thread")) +ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh Thread") --- End diff -- I generally agree, but I did this mostly to keep these two classes as congruent as possible. I can change it back if you feel strongly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r146472162 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -213,6 +216,24 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix) ) +// check that the credentials are defined, even though it's likely that auth would have failed +// already if you've made it this far +if (principal != null && currentHadoopDelegationTokens.isDefined) { + logDebug(s"Principal found ($principal) starting token renewer") + // The renewal time is ignored when getting the initial delegation tokens + // (CoarseGrainedSchedulerBackend.scala:getHadoopDelegationCreds), so we get the renewal + // time here and schedule a thread to renew them. + val renewalTime = --- End diff -- Ok, I agree. I refrained from touching the current code only because it was added it so recently. I assumed the omission of renewal time was intentional (or at least semi-intentional). I'll submit a refactor. Are you suggesting that we make the renewal thread part of `CoarseGrainedSchedulerBackend` (with the appropriate override for YARN/Mesos)? Or make a resource-manager specific method? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r146435148 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -159,6 +159,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp scheduler.getExecutorsAliveOnHost(host).foreach { exec => killExecutors(exec.toSeq, replace = true, force = true) } + + case UpdateDelegationTokens(tokens) => +// Update the driver's delegation tokens in case new executors are added later. +currentHadoopDelegationTokens = Some(tokens) +executorDataMap.values.foreach { ed => + ed.executorEndpoint.send(UpdateDelegationTokens(tokens)) } --- End diff -- `}` goes in next line. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r146436646 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala --- @@ -59,8 +59,7 @@ private[yarn] class AMCredentialRenewer( private var lastCredentialsFileSuffix = 0 private val credentialRenewer = -Executors.newSingleThreadScheduledExecutor( - ThreadUtils.namedThreadFactory("Credential Refresh Thread")) +ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh Thread") --- End diff -- Normally you should avoid making changes that are not related to your PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r146434999 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala --- @@ -17,7 +17,7 @@ package org.apache.spark.scheduler.cluster.mesos -import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.SparkContext --- End diff -- Change not needed? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r146436571 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{Executors, TimeUnit} + +import scala.collection.JavaConverters._ +import scala.util.Try + +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +/** + * The MesosCredentialRenewer will update the Hadoop credentials for Spark drivers accessing + * secured services using Kerberos authentication. It is modeled after the YARN AMCredential + * renewer, and similarly will renew the Credentials when 75% of the renewal interval has passed. + * The principal difference is that instead of writing the new credentials to HDFS and + * incrementing the timestamp of the file, the new credentials (called Tokens when they are + * serialized) are broadcast to all running executors. On the executor side, when new Tokens are + * recieved they overwrite the current credentials. + */ +class MesosCredentialRenewer( +conf: SparkConf, +tokenManager: HadoopDelegationTokenManager, +nextRenewal: Long, +driverEndpoint: RpcEndpointRef) extends Logging { + private val credentialRenewerThread = +ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread") + + @volatile private var timeOfNextRenewal = nextRenewal + + private val principal = conf.get(config.PRINCIPAL).orNull + + private val (secretFile, mode) = getSecretFile(conf) + + private def getSecretFile(conf: SparkConf): (String, String) = { +val keytab = conf.get(config.KEYTAB).orNull +val tgt = conf.getenv("KRB5CCNAME") +require(keytab != null || tgt != null, "A keytab or TGT required.") +// if both Keytab and TGT are detected we use the Keytab. +val (secretFile, mode) = if (keytab != null && tgt != null) { + logWarning(s"Keytab and TGT were detected, using keytab, unset $keytab to use TGT") + (keytab, "keytab") +} else { + val m = if (keytab != null) "keytab" else "tgt" + val sf = if (keytab != null) keytab else tgt + (sf, m) +} +logInfo(s"Logging in as $principal with mode $mode to retrieve Hadoop delegation tokens") +logDebug(s"secretFile is $secretFile") +(secretFile, mode) + } + + def scheduleTokenRenewal(): Unit = { +def scheduleRenewal(runnable: Runnable): Unit = { + val remainingTime = timeOfNextRenewal - System.currentTimeMillis() + if (remainingTime <= 0) { +logInfo("Credentials have expired, creating new ones now.") +runnable.run() + } else { +logInfo(s"Scheduling login from keytab in $remainingTime millis.") +credentialRenewerThread.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS) + } +} + +val credentialRenewerRunnable = + new Runnable { +override def run(): Unit = { + try { +val creds = getRenewedDelegationTokens(conf) +broadcastDelegationTokens(creds) + } catch { +case e: Exception => + // Log the error and try to write new tokens back in an hour +
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r146436883 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{Executors, TimeUnit} + +import scala.collection.JavaConverters._ +import scala.util.Try + +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +/** + * The MesosCredentialRenewer will update the Hadoop credentials for Spark drivers accessing + * secured services using Kerberos authentication. It is modeled after the YARN AMCredential + * renewer, and similarly will renew the Credentials when 75% of the renewal interval has passed. + * The principal difference is that instead of writing the new credentials to HDFS and + * incrementing the timestamp of the file, the new credentials (called Tokens when they are + * serialized) are broadcast to all running executors. On the executor side, when new Tokens are + * recieved they overwrite the current credentials. + */ +class MesosCredentialRenewer( +conf: SparkConf, +tokenManager: HadoopDelegationTokenManager, +nextRenewal: Long, +driverEndpoint: RpcEndpointRef) extends Logging { + private val credentialRenewerThread = +ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread") + + @volatile private var timeOfNextRenewal = nextRenewal + + private val principal = conf.get(config.PRINCIPAL).orNull + + private val (secretFile, mode) = getSecretFile(conf) + + private def getSecretFile(conf: SparkConf): (String, String) = { +val keytab = conf.get(config.KEYTAB).orNull +val tgt = conf.getenv("KRB5CCNAME") +require(keytab != null || tgt != null, "A keytab or TGT required.") +// if both Keytab and TGT are detected we use the Keytab. +val (secretFile, mode) = if (keytab != null && tgt != null) { + logWarning(s"Keytab and TGT were detected, using keytab, unset $keytab to use TGT") --- End diff -- `${KEYTAB.key}`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r146437027 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{Executors, TimeUnit} + +import scala.collection.JavaConverters._ +import scala.util.Try + +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +/** + * The MesosCredentialRenewer will update the Hadoop credentials for Spark drivers accessing + * secured services using Kerberos authentication. It is modeled after the YARN AMCredential + * renewer, and similarly will renew the Credentials when 75% of the renewal interval has passed. + * The principal difference is that instead of writing the new credentials to HDFS and + * incrementing the timestamp of the file, the new credentials (called Tokens when they are + * serialized) are broadcast to all running executors. On the executor side, when new Tokens are + * recieved they overwrite the current credentials. + */ +class MesosCredentialRenewer( +conf: SparkConf, +tokenManager: HadoopDelegationTokenManager, +nextRenewal: Long, +driverEndpoint: RpcEndpointRef) extends Logging { + private val credentialRenewerThread = +ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread") + + @volatile private var timeOfNextRenewal = nextRenewal + + private val principal = conf.get(config.PRINCIPAL).orNull + + private val (secretFile, mode) = getSecretFile(conf) + + private def getSecretFile(conf: SparkConf): (String, String) = { +val keytab = conf.get(config.KEYTAB).orNull +val tgt = conf.getenv("KRB5CCNAME") +require(keytab != null || tgt != null, "A keytab or TGT required.") +// if both Keytab and TGT are detected we use the Keytab. +val (secretFile, mode) = if (keytab != null && tgt != null) { + logWarning(s"Keytab and TGT were detected, using keytab, unset $keytab to use TGT") + (keytab, "keytab") +} else { + val m = if (keytab != null) "keytab" else "tgt" + val sf = if (keytab != null) keytab else tgt + (sf, m) +} +logInfo(s"Logging in as $principal with mode $mode to retrieve Hadoop delegation tokens") +logDebug(s"secretFile is $secretFile") +(secretFile, mode) + } + + def scheduleTokenRenewal(): Unit = { +def scheduleRenewal(runnable: Runnable): Unit = { + val remainingTime = timeOfNextRenewal - System.currentTimeMillis() + if (remainingTime <= 0) { +logInfo("Credentials have expired, creating new ones now.") +runnable.run() + } else { +logInfo(s"Scheduling login from keytab in $remainingTime millis.") +credentialRenewerThread.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS) + } +} + +val credentialRenewerRunnable = + new Runnable { +override def run(): Unit = { + try { +val creds = getRenewedDelegationTokens(conf) +broadcastDelegationTokens(creds) + } catch { +case e: Exception => + // Log the error and try to write new tokens back in an hour +
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r146436327 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -213,6 +216,24 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix) ) +// check that the credentials are defined, even though it's likely that auth would have failed +// already if you've made it this far +if (principal != null && currentHadoopDelegationTokens.isDefined) { + logDebug(s"Principal found ($principal) starting token renewer") + // The renewal time is ignored when getting the initial delegation tokens + // (CoarseGrainedSchedulerBackend.scala:getHadoopDelegationCreds), so we get the renewal + // time here and schedule a thread to renew them. + val renewalTime = --- End diff -- I still don't like this. You should not need to implement this separate method of getting the renewal time just because the current code is throwing out that information. Instead you should fix the code so that the information is preserved. `getHadoopDelegationCreds` is called in only one place, so my suggestion would be to encapsulate initializing the token manager and getting the initial set of tokens into a single method (instead of the current two). Then in that method's implementation you can get the initial set of tokens, initialize the renewer thread with the correct renewal period, and return the data needed by the scheduler. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r146434552 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -194,6 +198,27 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix) ) +// check that the credentials are defined, even though it's likely that auth would have failed +// already if you've made it this far +if (principal != null && hadoopDelegationCreds.isDefined) { + logDebug(s"Principal found ($principal) starting token renewer") + val credentialRenewerThread = new Thread { +setName("MesosCredentialRenewer") +override def run(): Unit = { + val rt = MesosCredentialRenewer.getTokenRenewalTime(hadoopDelegationCreds.get, conf) + val credentialRenewer = +new MesosCredentialRenewer( + conf, + hadoopDelegationTokenManager.get, + MesosCredentialRenewer.getNextRenewalTime(rt), + driverEndpoint) + credentialRenewer.scheduleTokenRenewal() +} + } + + credentialRenewerThread.start() + credentialRenewerThread.join() --- End diff -- That's the gist of it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r146094019 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -194,6 +198,27 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix) ) +// check that the credentials are defined, even though it's likely that auth would have failed +// already if you've made it this far +if (principal != null && hadoopDelegationCreds.isDefined) { + logDebug(s"Principal found ($principal) starting token renewer") + val credentialRenewerThread = new Thread { +setName("MesosCredentialRenewer") +override def run(): Unit = { + val rt = MesosCredentialRenewer.getTokenRenewalTime(hadoopDelegationCreds.get, conf) + val credentialRenewer = +new MesosCredentialRenewer( + conf, + hadoopDelegationTokenManager.get, + MesosCredentialRenewer.getNextRenewalTime(rt), + driverEndpoint) + credentialRenewer.scheduleTokenRenewal() +} + } + + credentialRenewerThread.start() + credentialRenewerThread.join() --- End diff -- Ok, you're probably right. It appears that the YARN code uses `setContextClassLoader(userClassLoader)` whereas in Mesos does not has a notion of `userClassLoader`. Therefore we don't need the separate thread in the Mesos code. Do I have this correct? Thanks for showing me this! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r146052502 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -194,6 +198,27 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix) ) +// check that the credentials are defined, even though it's likely that auth would have failed +// already if you've made it this far +if (principal != null && hadoopDelegationCreds.isDefined) { + logDebug(s"Principal found ($principal) starting token renewer") + val credentialRenewerThread = new Thread { +setName("MesosCredentialRenewer") +override def run(): Unit = { + val rt = MesosCredentialRenewer.getTokenRenewalTime(hadoopDelegationCreds.get, conf) + val credentialRenewer = +new MesosCredentialRenewer( + conf, + hadoopDelegationTokenManager.get, + MesosCredentialRenewer.getNextRenewalTime(rt), + driverEndpoint) + credentialRenewer.scheduleTokenRenewal() +} + } + + credentialRenewerThread.start() + credentialRenewerThread.join() --- End diff -- I don't think you really understood why the YARN code needs a thread and why I'm telling you this code does not. Read the comment you added here again; what makes you think the current thread does not have access to those classes? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r145861062 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -194,6 +198,27 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix) ) +// check that the credentials are defined, even though it's likely that auth would have failed +// already if you've made it this far +if (principal != null && hadoopDelegationCreds.isDefined) { + logDebug(s"Principal found ($principal) starting token renewer") + val credentialRenewerThread = new Thread { +setName("MesosCredentialRenewer") +override def run(): Unit = { + val rt = MesosCredentialRenewer.getTokenRenewalTime(hadoopDelegationCreds.get, conf) + val credentialRenewer = +new MesosCredentialRenewer( + conf, + hadoopDelegationTokenManager.get, + MesosCredentialRenewer.getNextRenewalTime(rt), + driverEndpoint) + credentialRenewer.scheduleTokenRenewal() +} + } + + credentialRenewerThread.start() + credentialRenewerThread.join() --- End diff -- Yes, sorry, for some reason I understood you you to mean the credential renewer itself. I added a comment to the same effect as the YARN analogue. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r145487542 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -194,6 +198,27 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix) ) +// check that the credentials are defined, even though it's likely that auth would have failed +// already if you've made it this far +if (principal != null && hadoopDelegationCreds.isDefined) { + logDebug(s"Principal found ($principal) starting token renewer") + val credentialRenewerThread = new Thread { +setName("MesosCredentialRenewer") +override def run(): Unit = { + val rt = MesosCredentialRenewer.getTokenRenewalTime(hadoopDelegationCreds.get, conf) + val credentialRenewer = +new MesosCredentialRenewer( + conf, + hadoopDelegationTokenManager.get, + MesosCredentialRenewer.getNextRenewalTime(rt), + driverEndpoint) + credentialRenewer.scheduleTokenRenewal() +} + } + + credentialRenewerThread.start() + credentialRenewerThread.join() --- End diff -- There's a comment explaining why that thread exists right above the code you linked. Did you look at it? Also, you're calling `join()` on the thread, so it's obviously going away. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r145301221 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -194,6 +198,27 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix) ) +// check that the credentials are defined, even though it's likely that auth would have failed +// already if you've made it this far +if (principal != null && hadoopDelegationCreds.isDefined) { + logDebug(s"Principal found ($principal) starting token renewer") + val credentialRenewerThread = new Thread { +setName("MesosCredentialRenewer") +override def run(): Unit = { + val rt = MesosCredentialRenewer.getTokenRenewalTime(hadoopDelegationCreds.get, conf) + val credentialRenewer = +new MesosCredentialRenewer( + conf, + hadoopDelegationTokenManager.get, + MesosCredentialRenewer.getNextRenewalTime(rt), + driverEndpoint) + credentialRenewer.scheduleTokenRenewal() +} + } + + credentialRenewerThread.start() + credentialRenewerThread.join() --- End diff -- Yes, I believe so. If you look here (https://github.com/apache/spark/blob/6735433cde44b3430fb44edfff58ef8149c66c13/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala#L271) it's the same pattern. The thread needs to run as long as the application driver, correct? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r145300847 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{Executors, TimeUnit} + +import scala.collection.JavaConverters._ +import scala.util.Try + +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +class MesosCredentialRenewer( +conf: SparkConf, +tokenManager: HadoopDelegationTokenManager, +nextRenewal: Long, +de: RpcEndpointRef) extends Logging { + private val credentialRenewerThread = +Executors.newSingleThreadScheduledExecutor( --- End diff -- I also changed `AMCredentialRenewer` to the same. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r145298313 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{Executors, TimeUnit} + +import scala.collection.JavaConverters._ +import scala.util.Try + +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +class MesosCredentialRenewer( +conf: SparkConf, +tokenManager: HadoopDelegationTokenManager, +nextRenewal: Long, +de: RpcEndpointRef) extends Logging { --- End diff -- fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r143854246 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{Executors, TimeUnit} + +import scala.collection.JavaConverters._ +import scala.util.Try + +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +class MesosCredentialRenewer( +conf: SparkConf, +tokenManager: HadoopDelegationTokenManager, +nextRenewal: Long, +de: RpcEndpointRef) extends Logging { + private val credentialRenewerThread = +Executors.newSingleThreadScheduledExecutor( + ThreadUtils.namedThreadFactory("Credential Refresh Thread")) + + @volatile private var timeOfNextRenewal = nextRenewal + + private val principal = conf.get("spark.yarn.principal") + + private val (secretFile, mode) = getSecretFile(conf) + + private def getSecretFile(conf: SparkConf): (String, String) = { +val keytab64 = conf.get("spark.yarn.keytab", null) +val tgt64 = System.getenv("KRB5CCNAME") +require(keytab64 != null || tgt64 != null, "keytab or tgt required") +require(keytab64 == null || tgt64 == null, "keytab and tgt cannot be used at the same time") --- End diff -- `KRB5CCNAME` is something that people might have in their environment for various reasons, so I'd avoid this requirement. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r143855260 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{Executors, TimeUnit} + +import scala.collection.JavaConverters._ +import scala.util.Try + +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +class MesosCredentialRenewer( +conf: SparkConf, +tokenManager: HadoopDelegationTokenManager, +nextRenewal: Long, +de: RpcEndpointRef) extends Logging { + private val credentialRenewerThread = +Executors.newSingleThreadScheduledExecutor( + ThreadUtils.namedThreadFactory("Credential Refresh Thread")) + + @volatile private var timeOfNextRenewal = nextRenewal + + private val principal = conf.get("spark.yarn.principal") + + private val (secretFile, mode) = getSecretFile(conf) + + private def getSecretFile(conf: SparkConf): (String, String) = { +val keytab64 = conf.get("spark.yarn.keytab", null) +val tgt64 = System.getenv("KRB5CCNAME") +require(keytab64 != null || tgt64 != null, "keytab or tgt required") +require(keytab64 == null || tgt64 == null, "keytab and tgt cannot be used at the same time") +val mode = if (keytab64 != null) "keytab" else "tgt" +val secretFile = if (keytab64 != null) keytab64 else tgt64 +logInfo(s"Logging in as $principal with mode $mode to retrieve HDFS delegation tokens") +logDebug(s"secretFile is $secretFile") +(secretFile, mode) + } + + def scheduleTokenRenewal(): Unit = { +def scheduleRenewal(runnable: Runnable): Unit = { + val remainingTime = timeOfNextRenewal - System.currentTimeMillis() + if (remainingTime <= 0) { +logInfo("Credentials have expired, creating new ones now.") +runnable.run() + } else { +logInfo(s"Scheduling login from keytab in $remainingTime millis.") +credentialRenewerThread.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS) + } +} + +val credentialRenewerRunnable = + new Runnable { +override def run(): Unit = { + try { +val creds = getRenewedDelegationTokens(conf) +broadcastDelegationTokens(creds) + } catch { +case e: Exception => + // Log the error and try to write new tokens back in an hour + logWarning("Couldn't broadcast tokens, trying again in an hour", e) + credentialRenewerThread.schedule(this, 1, TimeUnit.HOURS) + return + } + scheduleRenewal(this) +} + } +scheduleRenewal(credentialRenewerRunnable) + } + + private def getRenewedDelegationTokens(conf: SparkConf): Array[Byte] = { +logInfo(s"Attempting to login with ${conf.get("spark.yarn.principal", null)}") +// Get new delegation tokens by logging in with a new UGI +// inspired by AMCredentialRenewer.scala:L174 +val ugi = if (mode == "keytab") { --- End diff -- The correct way would be for the credential management code to differentiate between token creation and token renewal; that way it would renew tokens at the renewal internal and create new ones after the max
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r143835301 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala --- @@ -142,6 +142,13 @@ class SparkHadoopUtil extends Logging { } } + def addDelegationTokens(tokens: Array[Byte], sparkConf: SparkConf) { --- End diff -- Add a comment about what this method is doing and why it's needed. (YARN never sets the authentication method, so it'd be good to know why Mesos needs to do it.) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r143838652 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{Executors, TimeUnit} + +import scala.collection.JavaConverters._ +import scala.util.Try + +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +class MesosCredentialRenewer( +conf: SparkConf, +tokenManager: HadoopDelegationTokenManager, +nextRenewal: Long, +de: RpcEndpointRef) extends Logging { + private val credentialRenewerThread = +Executors.newSingleThreadScheduledExecutor( + ThreadUtils.namedThreadFactory("Credential Refresh Thread")) + + @volatile private var timeOfNextRenewal = nextRenewal + + private val principal = conf.get("spark.yarn.principal") --- End diff -- Use the `PRINCIPAL` constant. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r143836011 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -60,6 +62,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( override def hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = Some(new HadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration)) + private val principal = conf.get("spark.yarn.principal", null) --- End diff -- This config is defined in core already (`PRINCIPAL`). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r143839588 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{Executors, TimeUnit} + +import scala.collection.JavaConverters._ +import scala.util.Try + +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +class MesosCredentialRenewer( +conf: SparkConf, +tokenManager: HadoopDelegationTokenManager, +nextRenewal: Long, +de: RpcEndpointRef) extends Logging { + private val credentialRenewerThread = +Executors.newSingleThreadScheduledExecutor( + ThreadUtils.namedThreadFactory("Credential Refresh Thread")) + + @volatile private var timeOfNextRenewal = nextRenewal + + private val principal = conf.get("spark.yarn.principal") + + private val (secretFile, mode) = getSecretFile(conf) + + private def getSecretFile(conf: SparkConf): (String, String) = { +val keytab64 = conf.get("spark.yarn.keytab", null) +val tgt64 = System.getenv("KRB5CCNAME") --- End diff -- `64`? Also, using `conf.getenv` would allow tests to be written. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r143835387 --- Diff: core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala --- @@ -123,6 +123,11 @@ private[spark] class CoarseGrainedExecutorBackend( executor.stop() } }.start() + +// This message is only sent by Mesos Drivers, and is not expected from other --- End diff -- No need to add this comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r143836825 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -194,6 +198,27 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix) ) +// check that the credentials are defined, even though it's likely that auth would have failed +// already if you've made it this far +if (principal != null && hadoopDelegationCreds.isDefined) { + logDebug(s"Principal found ($principal) starting token renewer") + val credentialRenewerThread = new Thread { +setName("MesosCredentialRenewer") +override def run(): Unit = { + val rt = MesosCredentialRenewer.getTokenRenewalTime(hadoopDelegationCreds.get, conf) + val credentialRenewer = +new MesosCredentialRenewer( + conf, + hadoopDelegationTokenManager.get, + MesosCredentialRenewer.getNextRenewalTime(rt), + driverEndpoint) + credentialRenewer.scheduleTokenRenewal() +} + } + + credentialRenewerThread.start() + credentialRenewerThread.join() --- End diff -- Why do you need this thread if it's going to be short-lived? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r143857596 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{Executors, TimeUnit} + +import scala.collection.JavaConverters._ +import scala.util.Try + +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +class MesosCredentialRenewer( +conf: SparkConf, +tokenManager: HadoopDelegationTokenManager, +nextRenewal: Long, +de: RpcEndpointRef) extends Logging { --- End diff -- How about a more descriptive variable name? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r143838709 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{Executors, TimeUnit} + +import scala.collection.JavaConverters._ +import scala.util.Try + +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +class MesosCredentialRenewer( +conf: SparkConf, +tokenManager: HadoopDelegationTokenManager, +nextRenewal: Long, +de: RpcEndpointRef) extends Logging { + private val credentialRenewerThread = +Executors.newSingleThreadScheduledExecutor( + ThreadUtils.namedThreadFactory("Credential Refresh Thread")) + + @volatile private var timeOfNextRenewal = nextRenewal + + private val principal = conf.get("spark.yarn.principal") + + private val (secretFile, mode) = getSecretFile(conf) + + private def getSecretFile(conf: SparkConf): (String, String) = { +val keytab64 = conf.get("spark.yarn.keytab", null) --- End diff -- Similarly there's a `KEYTAB` constant. Also why `64`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r143835719 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -159,6 +159,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp scheduler.getExecutorsAliveOnHost(host).foreach { exec => killExecutors(exec.toSeq, replace = true, force = true) } + + case UpdateDelegationTokens(tokens) => +executorDataMap.values.foreach { + ed => ed.executorEndpoint.send(UpdateDelegationTokens(tokens)) --- End diff -- `ed =>` goes in previous line. (The whole thing might fit in one line, too.) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r143837225 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{Executors, TimeUnit} + +import scala.collection.JavaConverters._ +import scala.util.Try + +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +class MesosCredentialRenewer( +conf: SparkConf, +tokenManager: HadoopDelegationTokenManager, +nextRenewal: Long, +de: RpcEndpointRef) extends Logging { + private val credentialRenewerThread = +Executors.newSingleThreadScheduledExecutor( --- End diff -- `ThreadUtils.newDaemonSingleThreadScheduledExecutor`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r143856998 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -194,6 +198,27 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix) ) +// check that the credentials are defined, even though it's likely that auth would have failed +// already if you've made it this far +if (principal != null && hadoopDelegationCreds.isDefined) { + logDebug(s"Principal found ($principal) starting token renewer") + val credentialRenewerThread = new Thread { +setName("MesosCredentialRenewer") +override def run(): Unit = { + val rt = MesosCredentialRenewer.getTokenRenewalTime(hadoopDelegationCreds.get, conf) --- End diff -- So, you need this because `hadoopDelegationCreds` doesn't keep the information about when the tokens should be renewed (a.k.a. the return value of `obtainDelegationTokens`). Perhaps some minor refactoring would help clean this up? In fact, `hadoopDelegationCreds` is a `val`, so any executors that start after the initial token set expires will fail, no? Because they'll fetch `hadoopDelegationCreds` from the driver, and won't get the `UpdateDelegationTokens` until it's way too late. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user kalvinnchau commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r140830852 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -194,6 +198,26 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix) ) +// check that the credentials are defined, even though it's likely that auth would have failed +// already if you've made it this far +if (principal != null && hadoopDelegationCreds.isDefined) { + logDebug(s"Principal found ($principal) starting token renewer") + val credentialRenewerThread = new Thread { +setName("MesosCredentialRenewer") +override def run(): Unit = { + val credentialRenewer = +new MesosCredentialRenewer( + conf, + hadoopDelegationTokenManager.get, + MesosCredentialRenewer.getTokenRenewalTime(hadoopDelegationCreds.get, conf), --- End diff -- This sets the first renewal time to be the expiration time of the token. It should be similar to the way next renewal time in the MesosCredentialRenewer class is calculated so that it renews the first token after 75% of expiration time has passed: ```scala val currTime = System.currentTimeMillis() val renewTime = MesosCredentialRenewer.getTokenRenewalTime(hadoopDelegationCreds.get, conf) val rt = 0.75 * (renewTime - currTime) val credentialRenewer = new MesosCredentialRenewer( conf, hadoopDelegationTokenManager.get, (currTime + rt).toLong, driverEndpoint) credentialRenewer.scheduleTokenRenewal() ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r140625136 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{Executors, TimeUnit} + +import scala.collection.JavaConverters._ +import scala.util.Try + +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +class MesosCredentialRenewer( +conf: SparkConf, +tokenManager: HadoopDelegationTokenManager, +nextRenewal: Long, +de: RpcEndpointRef) extends Logging { + private val credentialRenewerThread = +Executors.newSingleThreadScheduledExecutor( + ThreadUtils.namedThreadFactory("Credential Refresh Thread")) + + @volatile private var timeOfNextRenewal = nextRenewal + + private val principal = conf.get("spark.yarn.principal") + + private val (secretFile, mode) = getSecretFile(conf) + + private def getSecretFile(conf: SparkConf): (String, String) = { +val keytab64 = conf.get("spark.yarn.keytab", null) +val tgt64 = System.getenv("KRB5CCNAME") +require(keytab64 != null || tgt64 != null, "keytab or tgt required") +require(keytab64 == null || tgt64 == null, "keytab and tgt cannot be used at the same time") +val mode = if (keytab64 != null) "keytab" else "tgt" +val secretFile = if (keytab64 != null) keytab64 else tgt64 +logInfo(s"Logging in as $principal with mode $mode to retrieve HDFS delegation tokens") +logDebug(s"secretFile is $secretFile") +(secretFile, mode) + } + + def scheduleTokenRenewal(): Unit = { +def scheduleRenewal(runnable: Runnable): Unit = { + val remainingTime = timeOfNextRenewal - System.currentTimeMillis() + if (remainingTime <= 0) { +logInfo("Credentials have expired, creating new ones now.") +runnable.run() + } else { +logInfo(s"Scheduling login from keytab in $remainingTime millis.") +credentialRenewerThread.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS) + } +} + +val credentialRenewerRunnable = + new Runnable { +override def run(): Unit = { + try { +val creds = getRenewedDelegationTokens(conf) +broadcastDelegationTokens(creds) + } catch { +case e: Exception => + // Log the error and try to write new tokens back in an hour + logWarning("Couldn't broadcast tokens, trying again in an hour", e) + credentialRenewerThread.schedule(this, 1, TimeUnit.HOURS) + return + } + scheduleRenewal(this) +} + } +scheduleRenewal(credentialRenewerRunnable) + } + + private def getRenewedDelegationTokens(conf: SparkConf): Array[Byte] = { +logInfo(s"Attempting to login with ${conf.get("spark.yarn.principal", null)}") +// Get new delegation tokens by logging in with a new UGI +// inspired by AMCredentialRenewer.scala:L174 +val ugi = if (mode == "keytab") { --- End diff -- Hello @kalvinnchau You are correct, all this does is keep track of when the tokens will expire and renew them at that time. Part of my motivation for doing this is to avoid writing any files to disk (like new
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user kalvinnchau commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r140326376 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{Executors, TimeUnit} + +import scala.collection.JavaConverters._ +import scala.util.Try + +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +class MesosCredentialRenewer( +conf: SparkConf, +tokenManager: HadoopDelegationTokenManager, +nextRenewal: Long, +de: RpcEndpointRef) extends Logging { + private val credentialRenewerThread = +Executors.newSingleThreadScheduledExecutor( + ThreadUtils.namedThreadFactory("Credential Refresh Thread")) + + @volatile private var timeOfNextRenewal = nextRenewal + + private val principal = conf.get("spark.yarn.principal") + + private val (secretFile, mode) = getSecretFile(conf) + + private def getSecretFile(conf: SparkConf): (String, String) = { +val keytab64 = conf.get("spark.yarn.keytab", null) +val tgt64 = System.getenv("KRB5CCNAME") +require(keytab64 != null || tgt64 != null, "keytab or tgt required") +require(keytab64 == null || tgt64 == null, "keytab and tgt cannot be used at the same time") +val mode = if (keytab64 != null) "keytab" else "tgt" +val secretFile = if (keytab64 != null) keytab64 else tgt64 +logInfo(s"Logging in as $principal with mode $mode to retrieve HDFS delegation tokens") +logDebug(s"secretFile is $secretFile") +(secretFile, mode) + } + + def scheduleTokenRenewal(): Unit = { +def scheduleRenewal(runnable: Runnable): Unit = { + val remainingTime = timeOfNextRenewal - System.currentTimeMillis() + if (remainingTime <= 0) { +logInfo("Credentials have expired, creating new ones now.") +runnable.run() + } else { +logInfo(s"Scheduling login from keytab in $remainingTime millis.") +credentialRenewerThread.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS) + } +} + +val credentialRenewerRunnable = + new Runnable { +override def run(): Unit = { + try { +val creds = getRenewedDelegationTokens(conf) +broadcastDelegationTokens(creds) + } catch { +case e: Exception => + // Log the error and try to write new tokens back in an hour + logWarning("Couldn't broadcast tokens, trying again in an hour", e) + credentialRenewerThread.schedule(this, 1, TimeUnit.HOURS) + return + } + scheduleRenewal(this) +} + } +scheduleRenewal(credentialRenewerRunnable) + } + + private def getRenewedDelegationTokens(conf: SparkConf): Array[Byte] = { +logInfo(s"Attempting to login with ${conf.get("spark.yarn.principal", null)}") +// Get new delegation tokens by logging in with a new UGI +// inspired by AMCredentialRenewer.scala:L174 +val ugi = if (mode == "keytab") { --- End diff -- I don't see where it refreshes the delegation tokens until the max-lifetime, then re-login with the keytab to get a new delegation tokens that'll last until the max-lifetime. Does this skip over
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r140118143 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala --- @@ -63,7 +63,8 @@ class MesosCredentialRenewer( def scheduleTokenRenewal(): Unit = { def scheduleRenewal(runnable: Runnable): Unit = { - val remainingTime = timeOfNextRenewal - System.currentTimeMillis() + // val remainingTime = timeOfNextRenewal - System.currentTimeMillis() + val remainingTime = 5000 --- End diff -- well that's embarrassing, just a debugging tool that I forgot to remove. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r140117834 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -198,16 +198,19 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix) ) -if (principal != null) { +// check that the credentials are defined, even though it's likely that auth would have failed +// already if you've made it this far +if (principal != null && hadoopDelegationCreds.isDefined) { logDebug(s"Principal found ($principal) starting token renewer") val credentialRenewerThread = new Thread { setName("MesosCredentialRenewer") override def run(): Unit = { + val dummy: Option[Array[Byte]] = None --- End diff -- whoops! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user susanxhuynh commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r140117253 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala --- @@ -63,7 +63,8 @@ class MesosCredentialRenewer( def scheduleTokenRenewal(): Unit = { def scheduleRenewal(runnable: Runnable): Unit = { - val remainingTime = timeOfNextRenewal - System.currentTimeMillis() + // val remainingTime = timeOfNextRenewal - System.currentTimeMillis() + val remainingTime = 5000 --- End diff -- Why 5000? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user susanxhuynh commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r140117055 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -198,16 +198,19 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix) ) -if (principal != null) { +// check that the credentials are defined, even though it's likely that auth would have failed +// already if you've made it this far +if (principal != null && hadoopDelegationCreds.isDefined) { logDebug(s"Principal found ($principal) starting token renewer") val credentialRenewerThread = new Thread { setName("MesosCredentialRenewer") override def run(): Unit = { + val dummy: Option[Array[Byte]] = None --- End diff -- What is this for? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r140088442 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{Executors, TimeUnit} + +import scala.collection.JavaConverters._ +import scala.util.Try + +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +class MesosCredentialRenewer( +conf: SparkConf, +tokenManager: HadoopDelegationTokenManager, +nextRenewal: Long, +de: RpcEndpointRef) extends Logging { + private val credentialRenewerThread = +Executors.newSingleThreadScheduledExecutor( + ThreadUtils.namedThreadFactory("Credential Refresh Thread")) + + @volatile private var timeOfNextRenewal = nextRenewal + + private val principal = conf.get("spark.yarn.principal") + + private val (secretFile, mode) = getSecretFile(conf) + + private def getSecretFile(conf: SparkConf): (String, String) = { +val keytab64 = conf.get("spark.yarn.keytab", null) +val tgt64 = System.getenv("KRB5CCNAME") +require(keytab64 != null || tgt64 != null, "keytab or tgt required") +require(keytab64 == null || tgt64 == null, "keytab and tgt cannot be used at the same time") +val mode = if (keytab64 != null) "keytab" else "tgt" +val secretFile = if (keytab64 != null) keytab64 else tgt64 +logInfo(s"Logging in as $principal with mode $mode to retrieve HDFS delegation tokens") +logDebug(s"secretFile is $secretFile") +(secretFile, mode) + } + + def scheduleTokenRenewal(): Unit = { +def scheduleRenewal(runnable: Runnable): Unit = { + val remainingTime = timeOfNextRenewal - System.currentTimeMillis() + if (remainingTime <= 0) { +logInfo("Credentials have expired, creating new ones now.") +runnable.run() + } else { +logInfo(s"Scheduling login from keytab in $remainingTime millis.") +credentialRenewerThread.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS) + } +} + +val credentialRenewerRunnable = + new Runnable { +override def run(): Unit = { + try { +val creds = getRenewedDelegationTokens(conf) +broadcastDelegationTokens(creds) + } catch { +case e: Exception => + // Log the error and try to write new tokens back in an hour + logWarning("Couldn't broadcast tokens, trying agin in 20 seconds", e) + credentialRenewerThread.schedule(this, 20, TimeUnit.SECONDS) + return + } + scheduleRenewal(this) +} + } +scheduleRenewal(credentialRenewerRunnable) + } + + private def getRenewedDelegationTokens(conf: SparkConf): Array[Byte] = { +logInfo(s"Attempting to login with ${conf.get("spark.yarn.principal", null)}") +// Get new delegation tokens by logging in with a new UGI +// inspired by AMCredentialRenewer.scala:L174 +val ugi = if (mode == "keytab") { + UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, secretFile) +} else { + UserGroupInformation.getUGIFromTicketCache(secretFile, principal) +} +val tempCreds =
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r140078733 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{Executors, TimeUnit} + +import scala.collection.JavaConverters._ +import scala.util.Try + +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +class MesosCredentialRenewer( +conf: SparkConf, +tokenManager: HadoopDelegationTokenManager, +nextRenewal: Long, +de: RpcEndpointRef) extends Logging { + private val credentialRenewerThread = +Executors.newSingleThreadScheduledExecutor( + ThreadUtils.namedThreadFactory("Credential Refresh Thread")) + + @volatile private var timeOfNextRenewal = nextRenewal + + private val principal = conf.get("spark.yarn.principal") + + private val (secretFile, mode) = getSecretFile(conf) + + private def getSecretFile(conf: SparkConf): (String, String) = { +val keytab64 = conf.get("spark.yarn.keytab", null) +val tgt64 = System.getenv("KRB5CCNAME") +require(keytab64 != null || tgt64 != null, "keytab or tgt required") +require(keytab64 == null || tgt64 == null, "keytab and tgt cannot be used at the same time") +val mode = if (keytab64 != null) "keytab" else "tgt" +val secretFile = if (keytab64 != null) keytab64 else tgt64 +logInfo(s"Logging in as $principal with mode $mode to retrieve HDFS delegation tokens") +logDebug(s"secretFile is $secretFile") +(secretFile, mode) + } + + def scheduleTokenRenewal(): Unit = { +def scheduleRenewal(runnable: Runnable): Unit = { + val remainingTime = timeOfNextRenewal - System.currentTimeMillis() + if (remainingTime <= 0) { +logInfo("Credentials have expired, creating new ones now.") +runnable.run() + } else { +logInfo(s"Scheduling login from keytab in $remainingTime millis.") +credentialRenewerThread.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS) + } +} + +val credentialRenewerRunnable = + new Runnable { +override def run(): Unit = { + try { +val creds = getRenewedDelegationTokens(conf) +broadcastDelegationTokens(creds) + } catch { +case e: Exception => + // Log the error and try to write new tokens back in an hour --- End diff -- good catch, I changed the code to match the YARN equivalent. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r139847070 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{Executors, TimeUnit} + +import scala.collection.JavaConverters._ +import scala.util.Try + +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +class MesosCredentialRenewer( +conf: SparkConf, +tokenManager: HadoopDelegationTokenManager, +nextRenewal: Long, +de: RpcEndpointRef) extends Logging { + private val credentialRenewerThread = +Executors.newSingleThreadScheduledExecutor( + ThreadUtils.namedThreadFactory("Credential Refresh Thread")) + + @volatile private var timeOfNextRenewal = nextRenewal + + private val principal = conf.get("spark.yarn.principal") + + private val (secretFile, mode) = getSecretFile(conf) + + private def getSecretFile(conf: SparkConf): (String, String) = { +val keytab64 = conf.get("spark.yarn.keytab", null) +val tgt64 = System.getenv("KRB5CCNAME") +require(keytab64 != null || tgt64 != null, "keytab or tgt required") +require(keytab64 == null || tgt64 == null, "keytab and tgt cannot be used at the same time") +val mode = if (keytab64 != null) "keytab" else "tgt" +val secretFile = if (keytab64 != null) keytab64 else tgt64 +logInfo(s"Logging in as $principal with mode $mode to retrieve HDFS delegation tokens") +logDebug(s"secretFile is $secretFile") +(secretFile, mode) + } + + def scheduleTokenRenewal(): Unit = { +def scheduleRenewal(runnable: Runnable): Unit = { + val remainingTime = timeOfNextRenewal - System.currentTimeMillis() + if (remainingTime <= 0) { +logInfo("Credentials have expired, creating new ones now.") +runnable.run() + } else { +logInfo(s"Scheduling login from keytab in $remainingTime millis.") +credentialRenewerThread.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS) + } +} + +val credentialRenewerRunnable = + new Runnable { +override def run(): Unit = { + try { +val creds = getRenewedDelegationTokens(conf) +broadcastDelegationTokens(creds) + } catch { +case e: Exception => + // Log the error and try to write new tokens back in an hour + logWarning("Couldn't broadcast tokens, trying agin in 20 seconds", e) + credentialRenewerThread.schedule(this, 20, TimeUnit.SECONDS) + return + } + scheduleRenewal(this) +} + } +scheduleRenewal(credentialRenewerRunnable) + } + + private def getRenewedDelegationTokens(conf: SparkConf): Array[Byte] = { +logInfo(s"Attempting to login with ${conf.get("spark.yarn.principal", null)}") +// Get new delegation tokens by logging in with a new UGI +// inspired by AMCredentialRenewer.scala:L174 +val ugi = if (mode == "keytab") { + UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, secretFile) +} else { + UserGroupInformation.getUGIFromTicketCache(secretFile, principal) +} +val tempCreds =
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r139841458 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -159,6 +159,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp scheduler.getExecutorsAliveOnHost(host).foreach { exec => killExecutors(exec.toSeq, replace = true, force = true) } + + case UpdateDelegationTokens(tokens) => +logDebug("Asking each executor to update HDFS delegation tokens") +for ((x, executorData) <- executorDataMap) { --- End diff -- Alternatively executorDataMap.values.foreach(...) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user susanxhuynh commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r139772286 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{Executors, TimeUnit} + +import scala.collection.JavaConverters._ +import scala.util.Try + +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +class MesosCredentialRenewer( +conf: SparkConf, +tokenManager: HadoopDelegationTokenManager, +nextRenewal: Long, +de: RpcEndpointRef) extends Logging { + private val credentialRenewerThread = +Executors.newSingleThreadScheduledExecutor( + ThreadUtils.namedThreadFactory("Credential Refresh Thread")) + + @volatile private var timeOfNextRenewal = nextRenewal + + private val principal = conf.get("spark.yarn.principal") + + private val (secretFile, mode) = getSecretFile(conf) + + private def getSecretFile(conf: SparkConf): (String, String) = { +val keytab64 = conf.get("spark.yarn.keytab", null) +val tgt64 = System.getenv("KRB5CCNAME") +require(keytab64 != null || tgt64 != null, "keytab or tgt required") +require(keytab64 == null || tgt64 == null, "keytab and tgt cannot be used at the same time") +val mode = if (keytab64 != null) "keytab" else "tgt" +val secretFile = if (keytab64 != null) keytab64 else tgt64 +logInfo(s"Logging in as $principal with mode $mode to retrieve HDFS delegation tokens") +logDebug(s"secretFile is $secretFile") +(secretFile, mode) + } + + def scheduleTokenRenewal(): Unit = { +def scheduleRenewal(runnable: Runnable): Unit = { + val remainingTime = timeOfNextRenewal - System.currentTimeMillis() + if (remainingTime <= 0) { +logInfo("Credentials have expired, creating new ones now.") +runnable.run() + } else { +logInfo(s"Scheduling login from keytab in $remainingTime millis.") +credentialRenewerThread.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS) + } +} + +val credentialRenewerRunnable = + new Runnable { +override def run(): Unit = { + try { +val creds = getRenewedDelegationTokens(conf) +broadcastDelegationTokens(creds) + } catch { +case e: Exception => + // Log the error and try to write new tokens back in an hour --- End diff -- Comment says "an hour" but code has 20 seconds. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user susanxhuynh commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r139726573 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{Executors, TimeUnit} + +import scala.collection.JavaConverters._ +import scala.util.Try + +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +class MesosCredentialRenewer( +conf: SparkConf, +tokenManager: HadoopDelegationTokenManager, +nextRenewal: Long, +de: RpcEndpointRef) extends Logging { + private val credentialRenewerThread = +Executors.newSingleThreadScheduledExecutor( + ThreadUtils.namedThreadFactory("Credential Refresh Thread")) + + @volatile private var timeOfNextRenewal = nextRenewal + + private val principal = conf.get("spark.yarn.principal") + + private val (secretFile, mode) = getSecretFile(conf) + + private def getSecretFile(conf: SparkConf): (String, String) = { +val keytab64 = conf.get("spark.yarn.keytab", null) +val tgt64 = System.getenv("KRB5CCNAME") +require(keytab64 != null || tgt64 != null, "keytab or tgt required") +require(keytab64 == null || tgt64 == null, "keytab and tgt cannot be used at the same time") +val mode = if (keytab64 != null) "keytab" else "tgt" +val secretFile = if (keytab64 != null) keytab64 else tgt64 +logInfo(s"Logging in as $principal with mode $mode to retrieve HDFS delegation tokens") +logDebug(s"secretFile is $secretFile") +(secretFile, mode) + } + + def scheduleTokenRenewal(): Unit = { +def scheduleRenewal(runnable: Runnable): Unit = { + val remainingTime = timeOfNextRenewal - System.currentTimeMillis() + if (remainingTime <= 0) { +logInfo("Credentials have expired, creating new ones now.") +runnable.run() + } else { +logInfo(s"Scheduling login from keytab in $remainingTime millis.") +credentialRenewerThread.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS) + } +} + +val credentialRenewerRunnable = + new Runnable { +override def run(): Unit = { + try { +val creds = getRenewedDelegationTokens(conf) +broadcastDelegationTokens(creds) + } catch { +case e: Exception => + // Log the error and try to write new tokens back in an hour + logWarning("Couldn't broadcast tokens, trying agin in 20 seconds", e) --- End diff -- (sp) "ag**a**in" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user susanxhuynh commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r139779444 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -159,6 +159,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp scheduler.getExecutorsAliveOnHost(host).foreach { exec => killExecutors(exec.toSeq, replace = true, force = true) } + + case UpdateDelegationTokens(tokens) => +logDebug("Asking each executor to update HDFS delegation tokens") +for ((x, executorData) <- executorDataMap) { --- End diff -- `(_, executorData)` would be more Scala-like. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r139642261 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -194,6 +198,24 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix) ) +if (principal != null) { + logDebug(s"Principal found ($principal) starting token renewer") + val credentialRenewerThread = new Thread { +setName("MesosCredentialRenewer") +override def run(): Unit = { + val credentialRenewer = +new MesosCredentialRenewer( + conf, + hadoopDelegationTokenManager.get, + MesosCredentialRenewer.getTokenRenewalInterval(hadoopDelegationCreds.get, conf), --- End diff -- hadoopDelegationCreds.get. Should we check against none? Creds are loaded in CoarseGrainedSchedulerBackend but if they are missing we should fail here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r139640414 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{Executors, TimeUnit} + +import scala.collection.JavaConverters._ +import scala.util.Try + +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +class MesosCredentialRenewer( +conf: SparkConf, +tokenManager: HadoopDelegationTokenManager, +nextRenewal: Long, +de: RpcEndpointRef) extends Logging { + private val credentialRenewerThread = +Executors.newSingleThreadScheduledExecutor( + ThreadUtils.namedThreadFactory("Credential Refresh Thread")) + + @volatile private var timeOfNextRenewal = nextRenewal + + private val principal = conf.get("spark.yarn.principal") + + private val (secretFile, mode) = getSecretFile(conf) + + private def getSecretFile(conf: SparkConf): (String, String) = { +val keytab64 = conf.get("spark.yarn.keytab", null) +val tgt64 = System.getenv("KRB5CCNAME") +require(keytab64 != null || tgt64 != null, "keytab or tgt required") +require(keytab64 == null || tgt64 == null, "keytab and tgt cannot be used at the same time") +val mode = if (keytab64 != null) "keytab" else "tgt" +val secretFile = if (keytab64 != null) keytab64 else tgt64 +logInfo(s"Logging in as $principal with mode $mode to retrieve HDFS delegation tokens") +logDebug(s"secretFile is $secretFile") +(secretFile, mode) + } + + def scheduleTokenRenewal(): Unit = { +def scheduleRenewal(runnable: Runnable): Unit = { + val remainingTime = timeOfNextRenewal - System.currentTimeMillis() + if (remainingTime <= 0) { +logInfo("Credentials have expired, creating new ones now.") +runnable.run() + } else { +logInfo(s"Scheduling login from keytab in $remainingTime millis.") +credentialRenewerThread.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS) + } +} + +val credentialRenewerRunnable = + new Runnable { +override def run(): Unit = { + try { +val creds = getRenewedDelegationTokens(conf) +broadcastDelegationTokens(creds) + } catch { +case e: Exception => + // Log the error and try to write new tokens back in an hour + logWarning("Couldn't broadcast tokens, trying agin in 20 seconds", e) + credentialRenewerThread.schedule(this, 20, TimeUnit.SECONDS) + return + } + scheduleRenewal(this) +} + } +scheduleRenewal(credentialRenewerRunnable) + } + + private def getRenewedDelegationTokens(conf: SparkConf): Array[Byte] = { +logInfo(s"Attempting to login with ${conf.get("spark.yarn.principal", null)}") +// Get new delegation tokens by logging in with a new UGI +// inspired by AMCredentialRenewer.scala:L174 +val ugi = if (mode == "keytab") { + UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, secretFile) +} else { + UserGroupInformation.getUGIFromTicketCache(secretFile, principal) +} +val tempCreds =
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r139637343 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -159,6 +159,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp scheduler.getExecutorsAliveOnHost(host).foreach { exec => killExecutors(exec.toSeq, replace = true, force = true) } + + case UpdateDelegationTokens(tokens) => +logDebug("Asking each executor to update HDFS delegation tokens") +for ((x, executorData) <- executorDataMap) { + executorData.executorEndpoint.send(UpdateDelegationTokens(tokens)) +} + --- End diff -- remove space --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r139636800 --- Diff: core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala --- @@ -174,6 +178,16 @@ private[spark] class CoarseGrainedExecutorBackend( private[spark] object CoarseGrainedExecutorBackend extends Logging { + private def addDelegationTokens(tokens: Array[Byte], sparkConf: SparkConf) { --- End diff -- Should we move this to SparkHadoopUtil and re-use methods such as: https://github.com/apache/spark/blob/d7b1fcf8f0a267322af0592b2cb31f1c8970fb16/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala#L131 ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org