[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19272


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-15 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r151239494
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+/**
+ * The MesosHadoopDelegationTokenManager fetches and updates Hadoop 
delegation tokens on the behalf
+ * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN 
AMCredentialRenewer,
+ * and similarly will renew the Credentials when 75% of the renewal 
interval has passed.
+ * The principal difference is that instead of writing the new credentials 
to HDFS and
+ * incrementing the timestamp of the file, the new credentials (called 
Tokens when they are
+ * serialized) are broadcast to all running executors. On the executor 
side, when new Tokens are
+ * received they overwrite the current credentials.
+ */
+private[spark] class MesosHadoopDelegationTokenManager(
+conf: SparkConf,
+hadoopConfig: Configuration,
+driverEndpoint: RpcEndpointRef)
+  extends Logging {
+
+  require(driverEndpoint != null, "DriverEndpoint is not initialized")
+
+  private val credentialRenewerThread: ScheduledExecutorService =
+ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal 
Thread")
+
+  private val tokenManager: HadoopDelegationTokenManager =
+new HadoopDelegationTokenManager(conf, hadoopConfig)
+
+  private val principal: String = conf.get(config.PRINCIPAL).orNull
+
+  private var (tokens: Array[Byte], timeOfNextRenewal: Long) = {
+try {
+  val creds = UserGroupInformation.getCurrentUser.getCredentials
+  val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+  val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
+  logInfo(s"Initialized tokens: 
${SparkHadoopUtil.get.dumpTokens(creds)}")
+  (SparkHadoopUtil.get.serialize(creds), rt)
+} catch {
+  case e: Exception =>
+logError(s"Failed to fetch Hadoop delegation tokens $e")
+throw e
+}
+  }
+
+  private val keytabFile: Option[String] = conf.get(config.KEYTAB)
+
+  scheduleTokenRenewal()
+
+  private def scheduleTokenRenewal(): Unit = {
+if (keytabFile.isDefined) {
+  require(principal != null, "Principal is required for Keytab-based 
authentication")
+  logInfo(s"Using keytab: ${keytabFile.get} and principal $principal")
+} else {
+  logInfo("Using ticket cache for Kerberos authentication, no token 
renewal.")
+  return
+}
+
+def scheduleRenewal(runnable: Runnable): Unit = {
+  val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
+  if (remainingTime <= 0) {
+logInfo("Credentials have expired, creating new ones now.")
+runnable.run()
+  } else {
+logInfo(s"Scheduling login from keytab in $remainingTime millis.")
+credentialRenewerThread.schedule(runnable, remainingTime, 
TimeUnit.MILLISECONDS)
+  }
+}
+
+val credentialRenewerRunnable =
+  new Runnable {
+override def run(): Unit = {
+  

[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r151219050
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+/**
+ * The MesosHadoopDelegationTokenManager fetches and updates Hadoop 
delegation tokens on the behalf
+ * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN 
AMCredentialRenewer,
+ * and similarly will renew the Credentials when 75% of the renewal 
interval has passed.
+ * The principal difference is that instead of writing the new credentials 
to HDFS and
+ * incrementing the timestamp of the file, the new credentials (called 
Tokens when they are
+ * serialized) are broadcast to all running executors. On the executor 
side, when new Tokens are
+ * received they overwrite the current credentials.
+ */
+private[spark] class MesosHadoopDelegationTokenManager(
+conf: SparkConf,
+hadoopConfig: Configuration,
+driverEndpoint: RpcEndpointRef)
+  extends Logging {
+
+  require(driverEndpoint != null, "DriverEndpoint is not initialized")
+
+  private val credentialRenewerThread: ScheduledExecutorService =
+ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal 
Thread")
+
+  private val tokenManager: HadoopDelegationTokenManager =
+new HadoopDelegationTokenManager(conf, hadoopConfig)
+
+  private val principal: String = conf.get(config.PRINCIPAL).orNull
+
+  private var (tokens: Array[Byte], timeOfNextRenewal: Long) = {
+try {
+  val creds = UserGroupInformation.getCurrentUser.getCredentials
+  val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+  val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
+  logInfo(s"Initialized tokens: 
${SparkHadoopUtil.get.dumpTokens(creds)}")
+  (SparkHadoopUtil.get.serialize(creds), rt)
+} catch {
+  case e: Exception =>
+logError(s"Failed to fetch Hadoop delegation tokens $e")
+throw e
+}
+  }
+
+  private val keytabFile: Option[String] = conf.get(config.KEYTAB)
+
+  scheduleTokenRenewal()
+
+  private def scheduleTokenRenewal(): Unit = {
+if (keytabFile.isDefined) {
+  require(principal != null, "Principal is required for Keytab-based 
authentication")
+  logInfo(s"Using keytab: ${keytabFile.get} and principal $principal")
+} else {
+  logInfo("Using ticket cache for Kerberos authentication, no token 
renewal.")
+  return
+}
+
+def scheduleRenewal(runnable: Runnable): Unit = {
+  val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
+  if (remainingTime <= 0) {
+logInfo("Credentials have expired, creating new ones now.")
+runnable.run()
+  } else {
+logInfo(s"Scheduling login from keytab in $remainingTime millis.")
+credentialRenewerThread.schedule(runnable, remainingTime, 
TimeUnit.MILLISECONDS)
+  }
+}
+
+val credentialRenewerRunnable =
+  new Runnable {
+override def run(): Unit = {
+  

[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r151216053
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
---
@@ -463,6 +474,19 @@ object SparkHadoopUtil {
   }
 
   /**
+   * Given an expiration date (e.g. for Hadoop Delegation Tokens) return a 
the date
+   * when a given fraction of the duration until the expiration date has 
passed.
+   * Formula: current time + (fraction * (time until expiration))
+   * @param expirationDate Drop-dead expiration date
+   * @param fraction fraction of the time until expiration return
+   * @return Date when the fraction of the time until expiration has passed
+   */
+  def getDateOfNextUpdate(expirationDate: Long, fraction: Double): Long = {
--- End diff --

Add `private[spark]`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r151217882
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+/**
+ * The MesosHadoopDelegationTokenManager fetches and updates Hadoop 
delegation tokens on the behalf
+ * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN 
AMCredentialRenewer,
+ * and similarly will renew the Credentials when 75% of the renewal 
interval has passed.
+ * The principal difference is that instead of writing the new credentials 
to HDFS and
+ * incrementing the timestamp of the file, the new credentials (called 
Tokens when they are
+ * serialized) are broadcast to all running executors. On the executor 
side, when new Tokens are
+ * received they overwrite the current credentials.
+ */
+private[spark] class MesosHadoopDelegationTokenManager(
+conf: SparkConf,
+hadoopConfig: Configuration,
+driverEndpoint: RpcEndpointRef)
+  extends Logging {
+
+  require(driverEndpoint != null, "DriverEndpoint is not initialized")
+
+  private val credentialRenewerThread: ScheduledExecutorService =
+ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal 
Thread")
+
+  private val tokenManager: HadoopDelegationTokenManager =
+new HadoopDelegationTokenManager(conf, hadoopConfig)
+
+  private val principal: String = conf.get(config.PRINCIPAL).orNull
+
+  private var (tokens: Array[Byte], timeOfNextRenewal: Long) = {
+try {
+  val creds = UserGroupInformation.getCurrentUser.getCredentials
+  val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+  val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
+  logInfo(s"Initialized tokens: 
${SparkHadoopUtil.get.dumpTokens(creds)}")
+  (SparkHadoopUtil.get.serialize(creds), rt)
+} catch {
+  case e: Exception =>
+logError(s"Failed to fetch Hadoop delegation tokens $e")
+throw e
+}
+  }
+
+  private val keytabFile: Option[String] = conf.get(config.KEYTAB)
+
+  scheduleTokenRenewal()
+
+  private def scheduleTokenRenewal(): Unit = {
+if (keytabFile.isDefined) {
+  require(principal != null, "Principal is required for Keytab-based 
authentication")
+  logInfo(s"Using keytab: ${keytabFile.get} and principal $principal")
+} else {
+  logInfo("Using ticket cache for Kerberos authentication, no token 
renewal.")
+  return
+}
+
+def scheduleRenewal(runnable: Runnable): Unit = {
+  val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
+  if (remainingTime <= 0) {
+logInfo("Credentials have expired, creating new ones now.")
+runnable.run()
+  } else {
+logInfo(s"Scheduling login from keytab in $remainingTime millis.")
+credentialRenewerThread.schedule(runnable, remainingTime, 
TimeUnit.MILLISECONDS)
+  }
+}
+
+val credentialRenewerRunnable =
+  new Runnable {
+override def run(): Unit = {
+  

[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r151216025
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
---
@@ -140,13 +140,24 @@ class SparkHadoopUtil extends Logging {
 if (!new File(keytabFilename).exists()) {
   throw new SparkException(s"Keytab file: ${keytabFilename} does not 
exist")
 } else {
-  logInfo("Attempting to login to Kerberos" +
-s" using principal: ${principalName} and keytab: 
${keytabFilename}")
+  logInfo("Attempting to login to Kerberos " +
+s"using principal: ${principalName} and keytab: ${keytabFilename}")
   UserGroupInformation.loginUserFromKeytab(principalName, 
keytabFilename)
 }
   }
 
   /**
+   * Add or overwrite current user's credentials with serialized 
delegation tokens,
+   * also confirms correct hadoop configuration is set.
+   */
+  def addDelegationTokens(tokens: Array[Byte], sparkConf: SparkConf) {
--- End diff --

Always forget this class is public. Add `private[spark]`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-14 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r150942920
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
 ---
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+/**
+ * The MesosHadoopDelegationTokenManager fetches and updates Hadoop 
delegation tokens on the behalf
+ * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN 
AMCredentialRenewer,
+ * and similarly will renew the Credentials when 75% of the renewal 
interval has passed.
+ * The principal difference is that instead of writing the new credentials 
to HDFS and
+ * incrementing the timestamp of the file, the new credentials (called 
Tokens when they are
+ * serialized) are broadcast to all running executors. On the executor 
side, when new Tokens are
+ * received they overwrite the current credentials.
+ */
+private[spark] class MesosHadoopDelegationTokenManager(
+conf: SparkConf,
+hadoopConfig: Configuration,
+driverEndpoint: RpcEndpointRef)
+  extends Logging {
+
+  require(driverEndpoint != null, "DriverEndpoint is not initialized")
+
+  private val credentialRenewerThread: ScheduledExecutorService =
+ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal 
Thread")
+
+  private val tokenManager: HadoopDelegationTokenManager =
+new HadoopDelegationTokenManager(conf, hadoopConfig)
+
+  private val principal: String = conf.get(config.PRINCIPAL).orNull
+
+  private val (secretFile: Option[String], mode: String) = 
getSecretFile(conf)
+
+  private var (tokens: Array[Byte], timeOfNextRenewal: Long) = {
+try {
+  val creds = UserGroupInformation.getCurrentUser.getCredentials
+  val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+  val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
+  logInfo(s"Initialized tokens: 
${SparkHadoopUtil.get.dumpTokens(creds)}")
+  (SparkHadoopUtil.get.serialize(creds), rt)
+} catch {
+  case e: Exception =>
+logError("Failed to initialize Hadoop delegation tokens\n" +
+  s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file 
$secretFile\n\tException: $e")
+throw e
+}
+  }
+
+  scheduleTokenRenewal()
+
+  private def getSecretFile(conf: SparkConf): (Option[String], String) = {
+val keytab = conf.get(config.KEYTAB)
+val tgt = Option(conf.getenv(SparkHadoopUtil.TICKET_CACHE_ENVVAR))
+val (secretFile, mode) = if (keytab.isDefined && tgt.isDefined) {
+  // if a keytab and a specific ticket cache is specified use the 
keytab and log the behavior
+  logWarning(s"Keytab and TGT were detected, using keytab, 
${keytab.get}, " +
+s"unset ${config.KEYTAB.key} to use TGT (${tgt.get})")
+  (keytab, "keytab")
+} else {
+  val m = if (keytab.isDefined) "keytab" else "tgt"
+  val sf = if (keytab.isDefined) keytab else tgt
+  (sf, m)
+}
+
+if (principal == null) {
+  require(mode == "tgt", s"Must specify a principal when using a 
Keytab, was $principal")
+  logInfo(s"Using ticket 

[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r150934881
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
 ---
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+/**
+ * The MesosHadoopDelegationTokenManager fetches and updates Hadoop 
delegation tokens on the behalf
+ * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN 
AMCredentialRenewer,
+ * and similarly will renew the Credentials when 75% of the renewal 
interval has passed.
+ * The principal difference is that instead of writing the new credentials 
to HDFS and
+ * incrementing the timestamp of the file, the new credentials (called 
Tokens when they are
+ * serialized) are broadcast to all running executors. On the executor 
side, when new Tokens are
+ * received they overwrite the current credentials.
+ */
+private[spark] class MesosHadoopDelegationTokenManager(
+conf: SparkConf,
+hadoopConfig: Configuration,
+driverEndpoint: RpcEndpointRef)
+  extends Logging {
+
+  require(driverEndpoint != null, "DriverEndpoint is not initialized")
+
+  private val credentialRenewerThread: ScheduledExecutorService =
+ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal 
Thread")
+
+  private val tokenManager: HadoopDelegationTokenManager =
+new HadoopDelegationTokenManager(conf, hadoopConfig)
+
+  private val principal: String = conf.get(config.PRINCIPAL).orNull
+
+  private val (secretFile: Option[String], mode: String) = 
getSecretFile(conf)
+
+  private var (tokens: Array[Byte], timeOfNextRenewal: Long) = {
+try {
+  val creds = UserGroupInformation.getCurrentUser.getCredentials
+  val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+  val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
+  logInfo(s"Initialized tokens: 
${SparkHadoopUtil.get.dumpTokens(creds)}")
+  (SparkHadoopUtil.get.serialize(creds), rt)
+} catch {
+  case e: Exception =>
+logError("Failed to initialize Hadoop delegation tokens\n" +
+  s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file 
$secretFile\n\tException: $e")
+throw e
+}
+  }
+
+  scheduleTokenRenewal()
+
+  private def getSecretFile(conf: SparkConf): (Option[String], String) = {
+val keytab = conf.get(config.KEYTAB)
+val tgt = Option(conf.getenv(SparkHadoopUtil.TICKET_CACHE_ENVVAR))
+val (secretFile, mode) = if (keytab.isDefined && tgt.isDefined) {
+  // if a keytab and a specific ticket cache is specified use the 
keytab and log the behavior
+  logWarning(s"Keytab and TGT were detected, using keytab, 
${keytab.get}, " +
+s"unset ${config.KEYTAB.key} to use TGT (${tgt.get})")
+  (keytab, "keytab")
+} else {
+  val m = if (keytab.isDefined) "keytab" else "tgt"
+  val sf = if (keytab.isDefined) keytab else tgt
+  (sf, m)
+}
+
+if (principal == null) {
+  require(mode == "tgt", s"Must specify a principal when using a 
Keytab, was $principal")
+  logInfo(s"Using ticket 

[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-14 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r150933634
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
 ---
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+/**
+ * The MesosHadoopDelegationTokenManager fetches and updates Hadoop 
delegation tokens on the behalf
+ * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN 
AMCredentialRenewer,
+ * and similarly will renew the Credentials when 75% of the renewal 
interval has passed.
+ * The principal difference is that instead of writing the new credentials 
to HDFS and
+ * incrementing the timestamp of the file, the new credentials (called 
Tokens when they are
+ * serialized) are broadcast to all running executors. On the executor 
side, when new Tokens are
+ * received they overwrite the current credentials.
+ */
+private[spark] class MesosHadoopDelegationTokenManager(
+conf: SparkConf,
+hadoopConfig: Configuration,
+driverEndpoint: RpcEndpointRef)
+  extends Logging {
+
+  require(driverEndpoint != null, "DriverEndpoint is not initialized")
+
+  private val credentialRenewerThread: ScheduledExecutorService =
+ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal 
Thread")
+
+  private val tokenManager: HadoopDelegationTokenManager =
+new HadoopDelegationTokenManager(conf, hadoopConfig)
+
+  private val principal: String = conf.get(config.PRINCIPAL).orNull
+
+  private val (secretFile: Option[String], mode: String) = 
getSecretFile(conf)
+
+  private var (tokens: Array[Byte], timeOfNextRenewal: Long) = {
+try {
+  val creds = UserGroupInformation.getCurrentUser.getCredentials
+  val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+  val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
+  logInfo(s"Initialized tokens: 
${SparkHadoopUtil.get.dumpTokens(creds)}")
+  (SparkHadoopUtil.get.serialize(creds), rt)
+} catch {
+  case e: Exception =>
+logError("Failed to initialize Hadoop delegation tokens\n" +
+  s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file 
$secretFile\n\tException: $e")
+throw e
+}
+  }
+
+  scheduleTokenRenewal()
+
+  private def getSecretFile(conf: SparkConf): (Option[String], String) = {
+val keytab = conf.get(config.KEYTAB)
+val tgt = Option(conf.getenv(SparkHadoopUtil.TICKET_CACHE_ENVVAR))
+val (secretFile, mode) = if (keytab.isDefined && tgt.isDefined) {
+  // if a keytab and a specific ticket cache is specified use the 
keytab and log the behavior
+  logWarning(s"Keytab and TGT were detected, using keytab, 
${keytab.get}, " +
+s"unset ${config.KEYTAB.key} to use TGT (${tgt.get})")
+  (keytab, "keytab")
+} else {
+  val m = if (keytab.isDefined) "keytab" else "tgt"
+  val sf = if (keytab.isDefined) keytab else tgt
+  (sf, m)
+}
+
+if (principal == null) {
+  require(mode == "tgt", s"Must specify a principal when using a 
Keytab, was $principal")
+  logInfo(s"Using ticket 

[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-13 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r150717225
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
 ---
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+/**
+ * The MesosHadoopDelegationTokenManager fetches and updates Hadoop 
delegation tokens on the behalf
+ * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN 
AMCredentialRenewer,
+ * and similarly will renew the Credentials when 75% of the renewal 
interval has passed.
+ * The principal difference is that instead of writing the new credentials 
to HDFS and
+ * incrementing the timestamp of the file, the new credentials (called 
Tokens when they are
+ * serialized) are broadcast to all running executors. On the executor 
side, when new Tokens are
+ * received they overwrite the current credentials.
+ */
+private[spark] class MesosHadoopDelegationTokenManager(
+conf: SparkConf,
+hadoopConfig: Configuration,
+driverEndpoint: RpcEndpointRef)
+  extends Logging {
+
+  require(driverEndpoint != null, "DriverEndpoint is not initialized")
+
+  private val credentialRenewerThread: ScheduledExecutorService =
+ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal 
Thread")
+
+  private val tokenManager: HadoopDelegationTokenManager =
+new HadoopDelegationTokenManager(conf, hadoopConfig)
+
+  private val principal: String = conf.get(config.PRINCIPAL).orNull
+
+  private val (secretFile: Option[String], mode: String) = 
getSecretFile(conf)
+
+  private var (tokens: Array[Byte], timeOfNextRenewal: Long) = {
+try {
+  val creds = UserGroupInformation.getCurrentUser.getCredentials
+  val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+  val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
+  logInfo(s"Initialized tokens: 
${SparkHadoopUtil.get.dumpTokens(creds)}")
+  (SparkHadoopUtil.get.serialize(creds), rt)
+} catch {
+  case e: Exception =>
+logError("Failed to initialize Hadoop delegation tokens\n" +
+  s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file 
$secretFile\n\tException: $e")
+throw e
+}
+  }
+
+  scheduleTokenRenewal()
+
+  private def getSecretFile(conf: SparkConf): (Option[String], String) = {
+val keytab = conf.get(config.KEYTAB)
+val tgt = Option(conf.getenv(SparkHadoopUtil.TICKET_CACHE_ENVVAR))
+val (secretFile, mode) = if (keytab.isDefined && tgt.isDefined) {
+  // if a keytab and a specific ticket cache is specified use the 
keytab and log the behavior
+  logWarning(s"Keytab and TGT were detected, using keytab, 
${keytab.get}, " +
+s"unset ${config.KEYTAB.key} to use TGT (${tgt.get})")
+  (keytab, "keytab")
+} else {
+  val m = if (keytab.isDefined) "keytab" else "tgt"
+  val sf = if (keytab.isDefined) keytab else tgt
+  (sf, m)
+}
+
+if (principal == null) {
+  require(mode == "tgt", s"Must specify a principal when using a 
Keytab, was $principal")
+  logInfo(s"Using ticket 

[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-13 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r150711597
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
 ---
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+/**
+ * The MesosHadoopDelegationTokenManager fetches and updates Hadoop 
delegation tokens on the behalf
+ * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN 
AMCredentialRenewer,
+ * and similarly will renew the Credentials when 75% of the renewal 
interval has passed.
+ * The principal difference is that instead of writing the new credentials 
to HDFS and
+ * incrementing the timestamp of the file, the new credentials (called 
Tokens when they are
+ * serialized) are broadcast to all running executors. On the executor 
side, when new Tokens are
+ * received they overwrite the current credentials.
+ */
+private[spark] class MesosHadoopDelegationTokenManager(
+conf: SparkConf,
+hadoopConfig: Configuration,
+driverEndpoint: RpcEndpointRef)
+  extends Logging {
+
+  require(driverEndpoint != null, "DriverEndpoint is not initialized")
+
+  private val credentialRenewerThread: ScheduledExecutorService =
+ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal 
Thread")
+
+  private val tokenManager: HadoopDelegationTokenManager =
+new HadoopDelegationTokenManager(conf, hadoopConfig)
+
+  private val principal: String = conf.get(config.PRINCIPAL).orNull
+
+  private val (secretFile: Option[String], mode: String) = 
getSecretFile(conf)
+
+  private var (tokens: Array[Byte], timeOfNextRenewal: Long) = {
+try {
+  val creds = UserGroupInformation.getCurrentUser.getCredentials
+  val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+  val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
+  logInfo(s"Initialized tokens: 
${SparkHadoopUtil.get.dumpTokens(creds)}")
+  (SparkHadoopUtil.get.serialize(creds), rt)
+} catch {
+  case e: Exception =>
+logError("Failed to initialize Hadoop delegation tokens\n" +
+  s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file 
$secretFile\n\tException: $e")
+throw e
+}
+  }
+
+  scheduleTokenRenewal()
+
+  private def getSecretFile(conf: SparkConf): (Option[String], String) = {
+val keytab = conf.get(config.KEYTAB)
+val tgt = Option(conf.getenv(SparkHadoopUtil.TICKET_CACHE_ENVVAR))
+val (secretFile, mode) = if (keytab.isDefined && tgt.isDefined) {
+  // if a keytab and a specific ticket cache is specified use the 
keytab and log the behavior
+  logWarning(s"Keytab and TGT were detected, using keytab, 
${keytab.get}, " +
+s"unset ${config.KEYTAB.key} to use TGT (${tgt.get})")
+  (keytab, "keytab")
+} else {
+  val m = if (keytab.isDefined) "keytab" else "tgt"
+  val sf = if (keytab.isDefined) keytab else tgt
+  (sf, m)
+}
+
+if (principal == null) {
+  require(mode == "tgt", s"Must specify a principal when using a 
Keytab, was $principal")
+  logInfo(s"Using ticket 

[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-13 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r150711225
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
 ---
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+/**
+ * The MesosHadoopDelegationTokenManager fetches and updates Hadoop 
delegation tokens on the behalf
+ * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN 
AMCredentialRenewer,
+ * and similarly will renew the Credentials when 75% of the renewal 
interval has passed.
+ * The principal difference is that instead of writing the new credentials 
to HDFS and
+ * incrementing the timestamp of the file, the new credentials (called 
Tokens when they are
+ * serialized) are broadcast to all running executors. On the executor 
side, when new Tokens are
+ * received they overwrite the current credentials.
+ */
+private[spark] class MesosHadoopDelegationTokenManager(
+conf: SparkConf,
+hadoopConfig: Configuration,
+driverEndpoint: RpcEndpointRef)
+  extends Logging {
+
+  require(driverEndpoint != null, "DriverEndpoint is not initialized")
+
+  private val credentialRenewerThread: ScheduledExecutorService =
+ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal 
Thread")
+
+  private val tokenManager: HadoopDelegationTokenManager =
+new HadoopDelegationTokenManager(conf, hadoopConfig)
+
+  private val principal: String = conf.get(config.PRINCIPAL).orNull
+
+  private val (secretFile: Option[String], mode: String) = 
getSecretFile(conf)
+
+  private var (tokens: Array[Byte], timeOfNextRenewal: Long) = {
+try {
+  val creds = UserGroupInformation.getCurrentUser.getCredentials
+  val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+  val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
+  logInfo(s"Initialized tokens: 
${SparkHadoopUtil.get.dumpTokens(creds)}")
+  (SparkHadoopUtil.get.serialize(creds), rt)
+} catch {
+  case e: Exception =>
+logError("Failed to initialize Hadoop delegation tokens\n" +
+  s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file 
$secretFile\n\tException: $e")
+throw e
+}
+  }
+
+  scheduleTokenRenewal()
+
+  private def getSecretFile(conf: SparkConf): (Option[String], String) = {
+val keytab = conf.get(config.KEYTAB)
+val tgt = Option(conf.getenv(SparkHadoopUtil.TICKET_CACHE_ENVVAR))
+val (secretFile, mode) = if (keytab.isDefined && tgt.isDefined) {
+  // if a keytab and a specific ticket cache is specified use the 
keytab and log the behavior
+  logWarning(s"Keytab and TGT were detected, using keytab, 
${keytab.get}, " +
+s"unset ${config.KEYTAB.key} to use TGT (${tgt.get})")
+  (keytab, "keytab")
+} else {
+  val m = if (keytab.isDefined) "keytab" else "tgt"
+  val sf = if (keytab.isDefined) keytab else tgt
+  (sf, m)
+}
+
+if (principal == null) {
+  require(mode == "tgt", s"Must specify a principal when using a 
Keytab, was $principal")
+  logInfo(s"Using ticket 

[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-13 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r150688646
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
 ---
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+/**
+ * The MesosHadoopDelegationTokenManager fetches and updates Hadoop 
delegation tokens on the behalf
+ * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN 
AMCredentialRenewer,
+ * and similarly will renew the Credentials when 75% of the renewal 
interval has passed.
+ * The principal difference is that instead of writing the new credentials 
to HDFS and
+ * incrementing the timestamp of the file, the new credentials (called 
Tokens when they are
+ * serialized) are broadcast to all running executors. On the executor 
side, when new Tokens are
+ * received they overwrite the current credentials.
+ */
+private[spark] class MesosHadoopDelegationTokenManager(
+conf: SparkConf,
+hadoopConfig: Configuration,
+driverEndpoint: RpcEndpointRef)
+  extends Logging {
+
+  require(driverEndpoint != null, "DriverEndpoint is not initialized")
+
+  private val credentialRenewerThread: ScheduledExecutorService =
+ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal 
Thread")
+
+  private val tokenManager: HadoopDelegationTokenManager =
+new HadoopDelegationTokenManager(conf, hadoopConfig)
+
+  private val principal: String = conf.get(config.PRINCIPAL).orNull
+
+  private val (secretFile: Option[String], mode: String) = 
getSecretFile(conf)
+
+  private var (tokens: Array[Byte], timeOfNextRenewal: Long) = {
+try {
+  val creds = UserGroupInformation.getCurrentUser.getCredentials
+  val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+  val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
+  logInfo(s"Initialized tokens: 
${SparkHadoopUtil.get.dumpTokens(creds)}")
+  (SparkHadoopUtil.get.serialize(creds), rt)
+} catch {
+  case e: Exception =>
+logError("Failed to initialize Hadoop delegation tokens\n" +
+  s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file 
$secretFile\n\tException: $e")
+throw e
+}
+  }
+
+  scheduleTokenRenewal()
+
+  private def getSecretFile(conf: SparkConf): (Option[String], String) = {
+val keytab = conf.get(config.KEYTAB)
+val tgt = Option(conf.getenv(SparkHadoopUtil.TICKET_CACHE_ENVVAR))
+val (secretFile, mode) = if (keytab.isDefined && tgt.isDefined) {
+  // if a keytab and a specific ticket cache is specified use the 
keytab and log the behavior
+  logWarning(s"Keytab and TGT were detected, using keytab, 
${keytab.get}, " +
+s"unset ${config.KEYTAB.key} to use TGT (${tgt.get})")
+  (keytab, "keytab")
+} else {
+  val m = if (keytab.isDefined) "keytab" else "tgt"
+  val sf = if (keytab.isDefined) keytab else tgt
+  (sf, m)
+}
+
+if (principal == null) {
+  require(mode == "tgt", s"Must specify a principal when using a 
Keytab, was $principal")
+  logInfo(s"Using ticket 

[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-09 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r150053337
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+/**
+ * The MesosHadoopDelegationTokenManager fetches and updates Hadoop 
delegation tokens on the behalf
+ * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN 
AMCredentialRenewer,
+ * and similarly will renew the Credentials when 75% of the renewal 
interval has passed.
+ * The principal difference is that instead of writing the new credentials 
to HDFS and
+ * incrementing the timestamp of the file, the new credentials (called 
Tokens when they are
+ * serialized) are broadcast to all running executors. On the executor 
side, when new Tokens are
+ * received they overwrite the current credentials.
+ */
+private[spark] class MesosHadoopDelegationTokenManager(
+conf: SparkConf, hadoopConfig: Configuration,
+driverEndpoint: RpcEndpointRef)
+  extends Logging {
+
+  private val credentialRenewerThread: ScheduledExecutorService =
+ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal 
Thread")
+
+  private val tokenManager: HadoopDelegationTokenManager =
+new HadoopDelegationTokenManager(conf, hadoopConfig)
+
+  private val principal: String = conf.get(config.PRINCIPAL).orNull
+
+  private val (secretFile: String, mode: String) = getSecretFile(conf)
+
+  private var (tokens: Array[Byte], timeOfNextRenewal: Long) = {
+try {
+  val creds = UserGroupInformation.getCurrentUser.getCredentials
+  val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+  val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
+  logInfo(s"Initialized tokens: 
${SparkHadoopUtil.get.dumpTokens(creds)}")
+  (SparkHadoopUtil.get.serialize(creds), rt)
+} catch {
+  case e: Exception =>
+throw new IllegalStateException("Failed to initialize Hadoop 
delegation tokens\n" +
+  s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file 
$secretFile\n\tException: $e")
+}
+  }
+
+  scheduleTokenRenewal()
+
+  private def getSecretFile(conf: SparkConf): (String, String) = {
+val keytab = conf.get(config.KEYTAB).orNull
+val tgt = conf.getenv("KRB5CCNAME")
+require(keytab != null || tgt != null, "A keytab or TGT required.")
--- End diff --

Is that really the case? `KRB5CCNAME` is not a required env variable. It 
has a default value, and the `UGI` class will use the credentials from the 
default location if they're available (and reloading the cache periodically).

So I think you don't really need this, but just to track whether there's a 
principal and keytab. And you don't need to call `getUGIFromTicketCache` later 
on since I'm pretty sure UGI takes care of that for you.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-09 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r150054048
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+/**
+ * The MesosHadoopDelegationTokenManager fetches and updates Hadoop 
delegation tokens on the behalf
+ * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN 
AMCredentialRenewer,
+ * and similarly will renew the Credentials when 75% of the renewal 
interval has passed.
+ * The principal difference is that instead of writing the new credentials 
to HDFS and
+ * incrementing the timestamp of the file, the new credentials (called 
Tokens when they are
+ * serialized) are broadcast to all running executors. On the executor 
side, when new Tokens are
+ * received they overwrite the current credentials.
+ */
+private[spark] class MesosHadoopDelegationTokenManager(
+conf: SparkConf, hadoopConfig: Configuration,
+driverEndpoint: RpcEndpointRef)
+  extends Logging {
+
+  private val credentialRenewerThread: ScheduledExecutorService =
+ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal 
Thread")
+
+  private val tokenManager: HadoopDelegationTokenManager =
+new HadoopDelegationTokenManager(conf, hadoopConfig)
+
+  private val principal: String = conf.get(config.PRINCIPAL).orNull
+
+  private val (secretFile: String, mode: String) = getSecretFile(conf)
+
+  private var (tokens: Array[Byte], timeOfNextRenewal: Long) = {
+try {
+  val creds = UserGroupInformation.getCurrentUser.getCredentials
+  val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+  val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
+  logInfo(s"Initialized tokens: 
${SparkHadoopUtil.get.dumpTokens(creds)}")
+  (SparkHadoopUtil.get.serialize(creds), rt)
+} catch {
+  case e: Exception =>
+throw new IllegalStateException("Failed to initialize Hadoop 
delegation tokens\n" +
+  s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file 
$secretFile\n\tException: $e")
+}
+  }
+
+  scheduleTokenRenewal()
+
+  private def getSecretFile(conf: SparkConf): (String, String) = {
+val keytab = conf.get(config.KEYTAB).orNull
+val tgt = conf.getenv("KRB5CCNAME")
+require(keytab != null || tgt != null, "A keytab or TGT required.")
+// if both Keytab and TGT are detected we use the Keytab.
+val (secretFile, mode) = if (keytab != null && tgt != null) {
+  logWarning(s"Keytab and TGT were detected, using keytab, " +
+s"unset ${config.KEYTAB.key} to use TGT")
+  (keytab, "keytab")
+} else {
+  val m = if (keytab != null) "keytab" else "tgt"
+  val sf = if (keytab != null) keytab else tgt
+  (sf, m)
+}
+
+if (principal == null) {
+  logInfo(s"Using mode: $mode to retrieve Hadoop delegation tokens")
+} else {
+  logInfo(s"Using principal: $principal with mode: $mode to retrieve 
Hadoop delegation tokens")
+}
+
+logDebug(s"secretFile is 

[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-09 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r150049113
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 ---
@@ -123,6 +123,9 @@ private[spark] class CoarseGrainedExecutorBackend(
   executor.stop()
 }
   }.start()
+
+case UpdateDelegationTokens(tokenBytes) =>
+  SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf)
--- End diff --

Can you add a `logInfo` saying the tokens are being updated? This has 
always been helpful when debugging issues with this feature on YARN.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-09 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r150051400
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+/**
+ * The MesosHadoopDelegationTokenManager fetches and updates Hadoop 
delegation tokens on the behalf
+ * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN 
AMCredentialRenewer,
+ * and similarly will renew the Credentials when 75% of the renewal 
interval has passed.
+ * The principal difference is that instead of writing the new credentials 
to HDFS and
+ * incrementing the timestamp of the file, the new credentials (called 
Tokens when they are
+ * serialized) are broadcast to all running executors. On the executor 
side, when new Tokens are
+ * received they overwrite the current credentials.
+ */
+private[spark] class MesosHadoopDelegationTokenManager(
+conf: SparkConf, hadoopConfig: Configuration,
+driverEndpoint: RpcEndpointRef)
+  extends Logging {
+
+  private val credentialRenewerThread: ScheduledExecutorService =
+ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal 
Thread")
+
+  private val tokenManager: HadoopDelegationTokenManager =
+new HadoopDelegationTokenManager(conf, hadoopConfig)
+
+  private val principal: String = conf.get(config.PRINCIPAL).orNull
+
+  private val (secretFile: String, mode: String) = getSecretFile(conf)
+
+  private var (tokens: Array[Byte], timeOfNextRenewal: Long) = {
+try {
+  val creds = UserGroupInformation.getCurrentUser.getCredentials
+  val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+  val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
+  logInfo(s"Initialized tokens: 
${SparkHadoopUtil.get.dumpTokens(creds)}")
+  (SparkHadoopUtil.get.serialize(creds), rt)
+} catch {
+  case e: Exception =>
+throw new IllegalStateException("Failed to initialize Hadoop 
delegation tokens\n" +
+  s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file 
$secretFile\n\tException: $e")
+}
+  }
+
+  scheduleTokenRenewal()
+
+  private def getSecretFile(conf: SparkConf): (String, String) = {
+val keytab = conf.get(config.KEYTAB).orNull
+val tgt = conf.getenv("KRB5CCNAME")
+require(keytab != null || tgt != null, "A keytab or TGT required.")
+// if both Keytab and TGT are detected we use the Keytab.
+val (secretFile, mode) = if (keytab != null && tgt != null) {
+  logWarning(s"Keytab and TGT were detected, using keytab, " +
+s"unset ${config.KEYTAB.key} to use TGT")
+  (keytab, "keytab")
+} else {
+  val m = if (keytab != null) "keytab" else "tgt"
+  val sf = if (keytab != null) keytab else tgt
+  (sf, m)
+}
+
+if (principal == null) {
+  logInfo(s"Using mode: $mode to retrieve Hadoop delegation tokens")
+} else {
+  logInfo(s"Using principal: $principal with mode: $mode to retrieve 
Hadoop delegation tokens")
+}
+
+logDebug(s"secretFile is 

[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-09 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r150052163
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+/**
+ * The MesosHadoopDelegationTokenManager fetches and updates Hadoop 
delegation tokens on the behalf
+ * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN 
AMCredentialRenewer,
+ * and similarly will renew the Credentials when 75% of the renewal 
interval has passed.
+ * The principal difference is that instead of writing the new credentials 
to HDFS and
+ * incrementing the timestamp of the file, the new credentials (called 
Tokens when they are
+ * serialized) are broadcast to all running executors. On the executor 
side, when new Tokens are
+ * received they overwrite the current credentials.
+ */
+private[spark] class MesosHadoopDelegationTokenManager(
+conf: SparkConf, hadoopConfig: Configuration,
+driverEndpoint: RpcEndpointRef)
+  extends Logging {
+
+  private val credentialRenewerThread: ScheduledExecutorService =
+ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal 
Thread")
+
+  private val tokenManager: HadoopDelegationTokenManager =
+new HadoopDelegationTokenManager(conf, hadoopConfig)
+
+  private val principal: String = conf.get(config.PRINCIPAL).orNull
+
+  private val (secretFile: String, mode: String) = getSecretFile(conf)
+
+  private var (tokens: Array[Byte], timeOfNextRenewal: Long) = {
+try {
+  val creds = UserGroupInformation.getCurrentUser.getCredentials
+  val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+  val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
+  logInfo(s"Initialized tokens: 
${SparkHadoopUtil.get.dumpTokens(creds)}")
+  (SparkHadoopUtil.get.serialize(creds), rt)
+} catch {
+  case e: Exception =>
+throw new IllegalStateException("Failed to initialize Hadoop 
delegation tokens\n" +
+  s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file 
$secretFile\n\tException: $e")
--- End diff --

Use `e` as the cause of the exception you're throwing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-09 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r150053751
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+/**
+ * The MesosHadoopDelegationTokenManager fetches and updates Hadoop 
delegation tokens on the behalf
+ * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN 
AMCredentialRenewer,
+ * and similarly will renew the Credentials when 75% of the renewal 
interval has passed.
+ * The principal difference is that instead of writing the new credentials 
to HDFS and
+ * incrementing the timestamp of the file, the new credentials (called 
Tokens when they are
+ * serialized) are broadcast to all running executors. On the executor 
side, when new Tokens are
+ * received they overwrite the current credentials.
+ */
+private[spark] class MesosHadoopDelegationTokenManager(
+conf: SparkConf, hadoopConfig: Configuration,
+driverEndpoint: RpcEndpointRef)
+  extends Logging {
+
+  private val credentialRenewerThread: ScheduledExecutorService =
+ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal 
Thread")
+
+  private val tokenManager: HadoopDelegationTokenManager =
+new HadoopDelegationTokenManager(conf, hadoopConfig)
+
+  private val principal: String = conf.get(config.PRINCIPAL).orNull
+
+  private val (secretFile: String, mode: String) = getSecretFile(conf)
+
+  private var (tokens: Array[Byte], timeOfNextRenewal: Long) = {
+try {
+  val creds = UserGroupInformation.getCurrentUser.getCredentials
+  val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+  val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
+  logInfo(s"Initialized tokens: 
${SparkHadoopUtil.get.dumpTokens(creds)}")
+  (SparkHadoopUtil.get.serialize(creds), rt)
+} catch {
+  case e: Exception =>
+throw new IllegalStateException("Failed to initialize Hadoop 
delegation tokens\n" +
+  s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file 
$secretFile\n\tException: $e")
+}
+  }
+
+  scheduleTokenRenewal()
+
+  private def getSecretFile(conf: SparkConf): (String, String) = {
+val keytab = conf.get(config.KEYTAB).orNull
+val tgt = conf.getenv("KRB5CCNAME")
+require(keytab != null || tgt != null, "A keytab or TGT required.")
+// if both Keytab and TGT are detected we use the Keytab.
+val (secretFile, mode) = if (keytab != null && tgt != null) {
+  logWarning(s"Keytab and TGT were detected, using keytab, " +
+s"unset ${config.KEYTAB.key} to use TGT")
+  (keytab, "keytab")
+} else {
+  val m = if (keytab != null) "keytab" else "tgt"
+  val sf = if (keytab != null) keytab else tgt
+  (sf, m)
+}
+
+if (principal == null) {
+  logInfo(s"Using mode: $mode to retrieve Hadoop delegation tokens")
+} else {
+  logInfo(s"Using principal: $principal with mode: $mode to retrieve 
Hadoop delegation tokens")
+}
+
+logDebug(s"secretFile is 

[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-09 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r150054305
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+/**
+ * The MesosHadoopDelegationTokenManager fetches and updates Hadoop 
delegation tokens on the behalf
+ * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN 
AMCredentialRenewer,
+ * and similarly will renew the Credentials when 75% of the renewal 
interval has passed.
+ * The principal difference is that instead of writing the new credentials 
to HDFS and
+ * incrementing the timestamp of the file, the new credentials (called 
Tokens when they are
+ * serialized) are broadcast to all running executors. On the executor 
side, when new Tokens are
+ * received they overwrite the current credentials.
+ */
+private[spark] class MesosHadoopDelegationTokenManager(
+conf: SparkConf, hadoopConfig: Configuration,
+driverEndpoint: RpcEndpointRef)
+  extends Logging {
+
+  private val credentialRenewerThread: ScheduledExecutorService =
+ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal 
Thread")
+
+  private val tokenManager: HadoopDelegationTokenManager =
+new HadoopDelegationTokenManager(conf, hadoopConfig)
+
+  private val principal: String = conf.get(config.PRINCIPAL).orNull
+
+  private val (secretFile: String, mode: String) = getSecretFile(conf)
+
+  private var (tokens: Array[Byte], timeOfNextRenewal: Long) = {
+try {
+  val creds = UserGroupInformation.getCurrentUser.getCredentials
+  val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+  val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
+  logInfo(s"Initialized tokens: 
${SparkHadoopUtil.get.dumpTokens(creds)}")
+  (SparkHadoopUtil.get.serialize(creds), rt)
+} catch {
+  case e: Exception =>
+throw new IllegalStateException("Failed to initialize Hadoop 
delegation tokens\n" +
+  s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file 
$secretFile\n\tException: $e")
+}
+  }
+
+  scheduleTokenRenewal()
+
+  private def getSecretFile(conf: SparkConf): (String, String) = {
+val keytab = conf.get(config.KEYTAB).orNull
+val tgt = conf.getenv("KRB5CCNAME")
+require(keytab != null || tgt != null, "A keytab or TGT required.")
+// if both Keytab and TGT are detected we use the Keytab.
+val (secretFile, mode) = if (keytab != null && tgt != null) {
+  logWarning(s"Keytab and TGT were detected, using keytab, " +
+s"unset ${config.KEYTAB.key} to use TGT")
+  (keytab, "keytab")
+} else {
+  val m = if (keytab != null) "keytab" else "tgt"
+  val sf = if (keytab != null) keytab else tgt
+  (sf, m)
+}
+
+if (principal == null) {
+  logInfo(s"Using mode: $mode to retrieve Hadoop delegation tokens")
+} else {
+  logInfo(s"Using principal: $principal with mode: $mode to retrieve 
Hadoop delegation tokens")
+}
+
+logDebug(s"secretFile is 

[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-09 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r150051073
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+/**
+ * The MesosHadoopDelegationTokenManager fetches and updates Hadoop 
delegation tokens on the behalf
+ * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN 
AMCredentialRenewer,
+ * and similarly will renew the Credentials when 75% of the renewal 
interval has passed.
+ * The principal difference is that instead of writing the new credentials 
to HDFS and
+ * incrementing the timestamp of the file, the new credentials (called 
Tokens when they are
+ * serialized) are broadcast to all running executors. On the executor 
side, when new Tokens are
+ * received they overwrite the current credentials.
+ */
+private[spark] class MesosHadoopDelegationTokenManager(
+conf: SparkConf, hadoopConfig: Configuration,
+driverEndpoint: RpcEndpointRef)
+  extends Logging {
+
+  private val credentialRenewerThread: ScheduledExecutorService =
+ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal 
Thread")
+
+  private val tokenManager: HadoopDelegationTokenManager =
+new HadoopDelegationTokenManager(conf, hadoopConfig)
+
+  private val principal: String = conf.get(config.PRINCIPAL).orNull
+
+  private val (secretFile: String, mode: String) = getSecretFile(conf)
+
+  private var (tokens: Array[Byte], timeOfNextRenewal: Long) = {
+try {
+  val creds = UserGroupInformation.getCurrentUser.getCredentials
+  val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+  val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
+  logInfo(s"Initialized tokens: 
${SparkHadoopUtil.get.dumpTokens(creds)}")
+  (SparkHadoopUtil.get.serialize(creds), rt)
+} catch {
+  case e: Exception =>
+throw new IllegalStateException("Failed to initialize Hadoop 
delegation tokens\n" +
+  s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file 
$secretFile\n\tException: $e")
+}
+  }
+
+  scheduleTokenRenewal()
+
+  private def getSecretFile(conf: SparkConf): (String, String) = {
+val keytab = conf.get(config.KEYTAB).orNull
+val tgt = conf.getenv("KRB5CCNAME")
+require(keytab != null || tgt != null, "A keytab or TGT required.")
+// if both Keytab and TGT are detected we use the Keytab.
+val (secretFile, mode) = if (keytab != null && tgt != null) {
+  logWarning(s"Keytab and TGT were detected, using keytab, " +
+s"unset ${config.KEYTAB.key} to use TGT")
+  (keytab, "keytab")
+} else {
+  val m = if (keytab != null) "keytab" else "tgt"
+  val sf = if (keytab != null) keytab else tgt
+  (sf, m)
+}
+
+if (principal == null) {
+  logInfo(s"Using mode: $mode to retrieve Hadoop delegation tokens")
--- End diff --

You should probably assert that mode is "tgt" in this case.


---

-
To unsubscribe, 

[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-09 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r150049758
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -236,7 +234,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 val reply = SparkAppConfig(
   sparkProperties,
   SparkEnv.get.securityManager.getIOEncryptionKey(),
-  hadoopDelegationCreds)
+  hadoopDelegationTokens.apply())
--- End diff --

Can't you just call `fetchHadoopDelegationTokens()` directly?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-09 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r150050571
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+/**
+ * The MesosHadoopDelegationTokenManager fetches and updates Hadoop 
delegation tokens on the behalf
+ * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN 
AMCredentialRenewer,
+ * and similarly will renew the Credentials when 75% of the renewal 
interval has passed.
+ * The principal difference is that instead of writing the new credentials 
to HDFS and
+ * incrementing the timestamp of the file, the new credentials (called 
Tokens when they are
+ * serialized) are broadcast to all running executors. On the executor 
side, when new Tokens are
+ * received they overwrite the current credentials.
+ */
+private[spark] class MesosHadoopDelegationTokenManager(
+conf: SparkConf, hadoopConfig: Configuration,
--- End diff --

One arg per line.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-09 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r150049429
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -159,6 +151,12 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 scheduler.getExecutorsAliveOnHost(host).foreach { exec =>
   killExecutors(exec.toSeq, replace = true, force = true)
 }
+
+  case UpdateDelegationTokens(newDelegationTokens) =>
+// Update the driver's delegation tokens in case new executors are 
added later.
--- End diff --

Stale comment?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-08 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r149797247
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -213,6 +216,14 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
   sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
 )
 
+// check that the credentials are defined, even though it's likely 
that auth would have failed
+// already if you've made it this far, then start the token renewer
+if (hadoopDelegationTokens.isDefined) {
--- End diff --

Check out the patch now. `hadoopDelegationTokens` now calls 
`initializeHadoopDelegationTokens` (renamed `fetchHadoopDelegationTokens`) by 
name:
```scala
  private val hadoopDelegationTokens: () => Option[Array[Byte]] = 
fetchHadoopDelegationTokens
```
 This has the effect of only generating the first set of delegation tokens 
once the first `RetrieveSparkAppConfig` message is received. At this point, 
everything has been initialized because renewer (renamed 
`MesosHadoopDelegationTokenManager`) is evaluated lazily with the correct 
`driverEndpoint`. 

It's a bit confusing to just avoid an extra conditional. WDYT? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-08 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r149775044
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -213,6 +216,14 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
   sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
 )
 
+// check that the credentials are defined, even though it's likely 
that auth would have failed
+// already if you've made it this far, then start the token renewer
+if (hadoopDelegationTokens.isDefined) {
--- End diff --

You could call `initializeHadoopDelegationTokens` in `start` after 
everything that's needed is initialized. It would also better follow the 
scheduler's lifecycle.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-07 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r149564294
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -213,6 +216,14 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
   sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
 )
 
+// check that the credentials are defined, even though it's likely 
that auth would have failed
+// already if you've made it this far, then start the token renewer
+if (hadoopDelegationTokens.isDefined) {
--- End diff --

I may have spoke too soon, there might be a way..


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-07 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r149549953
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -213,6 +216,14 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
   sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
 )
 
+// check that the credentials are defined, even though it's likely 
that auth would have failed
+// already if you've made it this far, then start the token renewer
+if (hadoopDelegationTokens.isDefined) {
--- End diff --

I agree that I shouldn't need to use the conditional 
`hadoopDelegationTokens.isDefined`, however there will need to be some check 
(`UserGroupInformation.isSecurityEnabled` or similar) to pass the 
`driverEndpoint` to the renewer/manager here. When the initial tokens are 
generated `driverEndpoint` is still `None` because `start()` hasn't been called 
yet. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-06 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r149235352
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -99,11 +96,8 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   // The num of current max ExecutorId used to re-register appMaster
   @volatile protected var currentExecutorIdCounter = 0
 
-  // hadoop token manager used by some sub-classes (e.g. Mesos)
-  def hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = 
None
-
-  // Hadoop delegation tokens to be sent to the executors.
-  val hadoopDelegationCreds: Option[Array[Byte]] = 
getHadoopDelegationCreds()
+  // Hadoop delegation tokens to be sent to the executors, can be updated 
as necessary.
+  protected var hadoopDelegationTokens: Option[Array[Byte]] = 
initializeHadoopDelegationTokens()
--- End diff --

Why is this protected? There's no reason I can see for subclasses to need 
access to this field.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-06 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r149237172
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -772,6 +783,14 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
   offer.getHostname
 }
   }
+
+  override def initializeHadoopDelegationTokens(): Option[Array[Byte]] = {
+if (UserGroupInformation.isSecurityEnabled) {
+  Some(hadoopCredentialRenewer.tokens)
--- End diff --

So, seems to me that your "renewer" is doing more than just renewing 
tokens; it's also being used to generate the initial set. So aside from my 
comments about initializing the renewer here, you should also probably make 
this API a little cleaner. Right now there's too much coupling.

The renewer should to renewals only, otherwise it should be called 
something different.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-06 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r149236687
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -58,8 +60,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
 with org.apache.mesos.Scheduler with MesosSchedulerUtils {
 
-  override def hadoopDelegationTokenManager: 
Option[HadoopDelegationTokenManager] =
-Some(new HadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration))
+  private lazy val hadoopCredentialRenewer: MesosCredentialRenewer =
+new MesosCredentialRenewer(
+  conf, new HadoopDelegationTokenManager(sc.conf, 
sc.hadoopConfiguration))
--- End diff --

Why pass in a `HadoopDelegationTokenManager` if it's not used by this 
class? The renewer can create one itself.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-06 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r149236410
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala
 ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+/**
+ * The MesosCredentialRenewer will update the Hadoop credentials for Spark 
drivers accessing
+ * secured services using Kerberos authentication. It is modeled after the 
YARN AMCredential
+ * renewer, and similarly will renew the Credentials when 75% of the 
renewal interval has passed.
+ * The principal difference is that instead of writing the new credentials 
to HDFS and
+ * incrementing the timestamp of the file, the new credentials (called 
Tokens when they are
+ * serialized) are broadcast to all running executors. On the executor 
side, when new Tokens are
+ * recieved they overwrite the current credentials.
+ */
+class MesosCredentialRenewer(
+conf: SparkConf,
+tokenManager: HadoopDelegationTokenManager) extends Logging {
+
+  private val credentialRenewerThread: ScheduledExecutorService =
+ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal 
Thread")
+
+  private val principal = conf.get(config.PRINCIPAL).orNull
+
+  private val (secretFile, mode) = getSecretFile(conf)
+
+  var (tokens: Array[Byte], timeOfNextRenewal: Long) = {
+try {
+  val creds = UserGroupInformation.getCurrentUser.getCredentials
+  val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+  val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
+  (SparkHadoopUtil.get.serialize(creds), rt)
+} catch {
+  case e: Exception =>
+throw new IllegalStateException("Failed to initialize Hadoop 
delegation tokens\n" +
+  s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file 
$secretFile\n\tException: $e")
+}
+
+  }
+
+  private def getSecretFile(conf: SparkConf): (String, String) = {
+val keytab = conf.get(config.KEYTAB).orNull
+val tgt = conf.getenv("KRB5CCNAME")
+require(keytab != null || tgt != null, "A keytab or TGT required.")
+// if both Keytab and TGT are detected we use the Keytab.
+val (secretFile, mode) = if (keytab != null && tgt != null) {
+  logWarning(s"Keytab and TGT were detected, using keytab, " +
+s"unset ${config.KEYTAB.key} to use TGT")
+  (keytab, "keytab")
+} else {
+  val m = if (keytab != null) "keytab" else "tgt"
+  val sf = if (keytab != null) keytab else tgt
+  (sf, m)
+}
+
+if (principal == null) {
+  logInfo(s"Using mode: $mode to retrieve Hadoop delegation tokens")
+} else {
+  logInfo(s"Using principal: $principal with mode: $mode to retrieve 
Hadoop delegation tokens")
+}
+
+logDebug(s"secretFile is $secretFile")
+(secretFile, mode)
+  }
+
+  def scheduleTokenRenewal(driverEndpoint: RpcEndpointRef): Unit = {
--- End diff --

Why isn't this done in the constructor? There's a single call to this 
method, and the renewal interval could very easily be turned into a constructor 
arg.


---

-
To 

[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-06 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r149235095
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
 ---
@@ -55,7 +55,9 @@ private[spark] class HadoopDelegationTokenManager(
   logDebug(s"Using the following delegation token providers: " +
 s"${delegationTokenProviders.keys.mkString(", ")}.")
 
-  /** Construct a [[HadoopDelegationTokenManager]] for the default Hadoop 
filesystem */
+  /**
--- End diff --

This is not really changing anything, so I'd just revert changes to this 
file. Or, if you really want to, just keep the new `@param`s you're adding 
below.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-06 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r149235641
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -213,6 +216,14 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
   sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
 )
 
+// check that the credentials are defined, even though it's likely 
that auth would have failed
+// already if you've made it this far, then start the token renewer
+if (hadoopDelegationTokens.isDefined) {
--- End diff --

You shouldn't do this here, otherwise you need to keep that field 
`protected` in the parent class and that adds unnecessary coupling. Instead, do 
this in `initializeHadoopDelegationTokens`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-06 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r149235832
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala
 ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+/**
+ * The MesosCredentialRenewer will update the Hadoop credentials for Spark 
drivers accessing
+ * secured services using Kerberos authentication. It is modeled after the 
YARN AMCredential
+ * renewer, and similarly will renew the Credentials when 75% of the 
renewal interval has passed.
+ * The principal difference is that instead of writing the new credentials 
to HDFS and
+ * incrementing the timestamp of the file, the new credentials (called 
Tokens when they are
+ * serialized) are broadcast to all running executors. On the executor 
side, when new Tokens are
+ * recieved they overwrite the current credentials.
+ */
+class MesosCredentialRenewer(
--- End diff --

`private[spark]`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-06 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r149235915
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala
 ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+/**
+ * The MesosCredentialRenewer will update the Hadoop credentials for Spark 
drivers accessing
+ * secured services using Kerberos authentication. It is modeled after the 
YARN AMCredential
+ * renewer, and similarly will renew the Credentials when 75% of the 
renewal interval has passed.
+ * The principal difference is that instead of writing the new credentials 
to HDFS and
+ * incrementing the timestamp of the file, the new credentials (called 
Tokens when they are
+ * serialized) are broadcast to all running executors. On the executor 
side, when new Tokens are
+ * recieved they overwrite the current credentials.
+ */
+class MesosCredentialRenewer(
+conf: SparkConf,
+tokenManager: HadoopDelegationTokenManager) extends Logging {
+
+  private val credentialRenewerThread: ScheduledExecutorService =
+ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal 
Thread")
+
+  private val principal = conf.get(config.PRINCIPAL).orNull
+
+  private val (secretFile, mode) = getSecretFile(conf)
+
+  var (tokens: Array[Byte], timeOfNextRenewal: Long) = {
--- End diff --

`private`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-05 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r148982457
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -58,8 +62,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
 with org.apache.mesos.Scheduler with MesosSchedulerUtils {
 
-  override def hadoopDelegationTokenManager: 
Option[HadoopDelegationTokenManager] =
-Some(new HadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration))
+  private lazy val hadoopCredentialRenewer: MesosCredentialRenewer =
--- End diff --

For Mesos the credential renewer contains the tokens, the renewal time, and 
all logic to renew the tokens. Should never be evaluated (and tokens never 
initialized) if `UserGroupInformation.isSecurityEnabled` evaluates to `false`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-05 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r148982255
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -686,18 +687,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 true
   }
 
-  protected def getHadoopDelegationCreds(): Option[Array[Byte]] = {
--- End diff --

This method was only called once, and would discard the renewal time 
information limiting it's utility.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-05 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r148982165
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -99,11 +96,8 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   // The num of current max ExecutorId used to re-register appMaster
   @volatile protected var currentExecutorIdCounter = 0
 
-  // hadoop token manager used by some sub-classes (e.g. Mesos)
-  def hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = 
None
--- End diff --

No longer needed because resource-manager backends (may) implement their 
own `initializeHadoopDelegationTokens`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-01 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r148341472
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -102,8 +99,15 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   // hadoop token manager used by some sub-classes (e.g. Mesos)
   def hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = 
None
 
-  // Hadoop delegation tokens to be sent to the executors.
-  val hadoopDelegationCreds: Option[Array[Byte]] = 
getHadoopDelegationCreds()
+  def hadoopCredentialRenewer: Option[HadoopCredentialRenewer] = None
--- End diff --

Why is this needed here? This class doesn't use it at all.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-01 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r148337519
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/HadoopCredentialRenewer.scala
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import org.apache.spark.internal.Logging
+
+/**
+ * Abstract class for credential renewers used my YARN and Mesos. To renew 
delegation tokens
+ * the scheduler backend calls [[scheduleTokenRenewal]]. The 
implementation of this method
+ * (and the dispersal of tokens) is resource-manager-specific, see 
implementations of this
+ * class for details.
+ */
+abstract class HadoopCredentialRenewer extends Logging {
--- End diff --

`private[spark]`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-01 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r148339089
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
 ---
@@ -75,6 +78,17 @@ private[spark] class HadoopDelegationTokenManager(
   .toMap
   }
 
+  def getRenewableDelegationTokens(): Option[RenewableDelegationTokens] = {
--- End diff --

I'm not too fond of the idea of having two methods for creating tokens 
(this and `obtainDelegationTokens`). One should be enough.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-01 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r148342319
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -102,8 +99,15 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   // hadoop token manager used by some sub-classes (e.g. Mesos)
   def hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = 
None
 
-  // Hadoop delegation tokens to be sent to the executors.
-  val hadoopDelegationCreds: Option[Array[Byte]] = 
getHadoopDelegationCreds()
+  def hadoopCredentialRenewer: Option[HadoopCredentialRenewer] = None
+
+  // Hadoop delegation tokens to be sent to the executors, can be updated 
as necessary.
+  var renewableDelegationTokens: Option[RenewableDelegationTokens] =
--- End diff --

This is another part where the interface is brittle and a bit confusing. 
This should really be a private field, because there should be a single way for 
sub-classes to update it (send a `UpdateDelegationTokens` message). But it's 
not because Mesos needs to poke at it because the rest of the API is all a bit 
weird.

If instead you follow my original suggestion (have a abstract 
`initializeDelegationTokens()` method that Mesos overrides), then everything 
becomes a lot more localized and cleaner.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-01 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r148338530
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
 ---
@@ -125,3 +141,5 @@ private[spark] class HadoopDelegationTokenManager(
 }.foldLeft(Long.MaxValue)(math.min)
   }
 }
+
+case class RenewableDelegationTokens(credentials: Array[Byte], 
nextRenewalTime: Long)
--- End diff --

This is not a great name from this class; it doesn't provide any 
functionality related to renewing the tokens, so it's not really clear why it's 
called "renewable".

If this actually wrapped the `HadoopDelegationTokenManager` and provided a 
`renew` method or something, that would be better. But at the same time this is 
starting to make this API too confusing.

So two options here: either use the existing API and make the caller hold 
on to the two pieces of info (tokens and renewal time), or change the existing 
API. But don't add a new API that basically does the same thing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-01 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r148337992
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
 ---
@@ -55,7 +56,9 @@ private[spark] class HadoopDelegationTokenManager(
   logDebug(s"Using the following delegation token providers: " +
 s"${delegationTokenProviders.keys.mkString(", ")}.")
 
-  /** Construct a [[HadoopDelegationTokenManager]] for the default Hadoop 
filesystem */
+  /** Construct a [[HadoopDelegationTokenManager]] for the default Hadoop 
filesystem with a
--- End diff --

Comment starts on next line. But the comment doesn't really seem accurate 
anyway.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-31 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r148013037
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -233,10 +249,15 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 context.reply(true)
 
   case RetrieveSparkAppConfig =>
+val tokens = if (renewableDelegationTokens.isDefined) {
+  Some(renewableDelegationTokens.get.credentials)
+} else {
+  None
+}
 val reply = SparkAppConfig(
   sparkProperties,
   SparkEnv.get.securityManager.getIOEncryptionKey(),
-  hadoopDelegationCreds)
+  tokens)
--- End diff --

could make `SparkAppConfig` just take a `RenewableDelegationTokens` object? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-30 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r147866441
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala
 ---
@@ -17,7 +17,7 @@
 
 package org.apache.spark.scheduler.cluster.mesos
 
-import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.SparkContext
--- End diff --

`SparkException` is unused, not sure why it was there in the first place


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-25 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r146946137
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -213,6 +216,24 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
   sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
 )
 
+// check that the credentials are defined, even though it's likely 
that auth would have failed
+// already if you've made it this far
+if (principal != null && currentHadoopDelegationTokens.isDefined) {
+  logDebug(s"Principal found ($principal) starting token renewer")
+  // The renewal time is ignored when getting the initial delegation 
tokens
+  // (CoarseGrainedSchedulerBackend.scala:getHadoopDelegationCreds), 
so we get the renewal
+  // time here and schedule a thread to renew them.
+  val renewalTime =
--- End diff --

But do note that you still can't start the token renewer automatically, 
since YARN still has its own renewer and things would get mixed up.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-25 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r146943847
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -213,6 +216,24 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
   sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
 )
 
+// check that the credentials are defined, even though it's likely 
that auth would have failed
+// already if you've made it this far
+if (principal != null && currentHadoopDelegationTokens.isDefined) {
+  logDebug(s"Principal found ($principal) starting token renewer")
+  // The renewal time is ignored when getting the initial delegation 
tokens
+  // (CoarseGrainedSchedulerBackend.scala:getHadoopDelegationCreds), 
so we get the renewal
+  // time here and schedule a thread to renew them.
+  val renewalTime =
--- End diff --

I guess that would be ok. I was a little worried about mixing the two but 
they are kinda closely related...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-25 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r146855311
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -213,6 +216,24 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
   sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
 )
 
+// check that the credentials are defined, even though it's likely 
that auth would have failed
+// already if you've made it this far
+if (principal != null && currentHadoopDelegationTokens.isDefined) {
+  logDebug(s"Principal found ($principal) starting token renewer")
+  // The renewal time is ignored when getting the initial delegation 
tokens
+  // (CoarseGrainedSchedulerBackend.scala:getHadoopDelegationCreds), 
so we get the renewal
+  // time here and schedule a thread to renew them.
+  val renewalTime =
--- End diff --

Maybe an overly simplistic solution, but what about putting this 
functionality into `HadoopDelegationTokenManager`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-24 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r146622470
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -213,6 +216,24 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
   sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
 )
 
+// check that the credentials are defined, even though it's likely 
that auth would have failed
+// already if you've made it this far
+if (principal != null && currentHadoopDelegationTokens.isDefined) {
+  logDebug(s"Principal found ($principal) starting token renewer")
+  // The renewal time is ignored when getting the initial delegation 
tokens
+  // (CoarseGrainedSchedulerBackend.scala:getHadoopDelegationCreds), 
so we get the renewal
+  // time here and schedule a thread to renew them.
+  val renewalTime =
--- End diff --

It makes more sense to have it in `CoarseGrainedSchedulerBackend` (then 
YARN and Mesos can easily share it); my worry is exposing the functionality to 
standalone (it would work, but not be secure, which is a problem).

So maybe add a second method `def supportsTokenRenewal` that subclasses can 
override to return `true` or something.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-24 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r146622564
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
 ---
@@ -59,8 +59,7 @@ private[yarn] class AMCredentialRenewer(
   private var lastCredentialsFileSuffix = 0
 
   private val credentialRenewer =
-Executors.newSingleThreadScheduledExecutor(
-  ThreadUtils.namedThreadFactory("Credential Refresh Thread"))
+ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh 
Thread")
--- End diff --

It's ok since you've done it, I was mostly making a generic comment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-24 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r146473582
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
 ---
@@ -59,8 +59,7 @@ private[yarn] class AMCredentialRenewer(
   private var lastCredentialsFileSuffix = 0
 
   private val credentialRenewer =
-Executors.newSingleThreadScheduledExecutor(
-  ThreadUtils.namedThreadFactory("Credential Refresh Thread"))
+ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh 
Thread")
--- End diff --

I generally agree, but I did this mostly to keep these two classes as 
congruent as possible. I can change it back if you feel strongly. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-24 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r146472162
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -213,6 +216,24 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
   sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
 )
 
+// check that the credentials are defined, even though it's likely 
that auth would have failed
+// already if you've made it this far
+if (principal != null && currentHadoopDelegationTokens.isDefined) {
+  logDebug(s"Principal found ($principal) starting token renewer")
+  // The renewal time is ignored when getting the initial delegation 
tokens
+  // (CoarseGrainedSchedulerBackend.scala:getHadoopDelegationCreds), 
so we get the renewal
+  // time here and schedule a thread to renew them.
+  val renewalTime =
--- End diff --

Ok, I agree. I refrained from touching the current code only because it was 
added it so recently. I assumed the omission of renewal time was intentional 
(or at least semi-intentional). I'll submit a refactor. Are you suggesting that 
we make the renewal thread part of `CoarseGrainedSchedulerBackend` (with the 
appropriate override for YARN/Mesos)?  Or make a resource-manager specific 
method?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r146435148
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -159,6 +159,12 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 scheduler.getExecutorsAliveOnHost(host).foreach { exec =>
   killExecutors(exec.toSeq, replace = true, force = true)
 }
+
+  case UpdateDelegationTokens(tokens) =>
+// Update the driver's delegation tokens in case new executors are 
added later.
+currentHadoopDelegationTokens = Some(tokens)
+executorDataMap.values.foreach { ed =>
+  ed.executorEndpoint.send(UpdateDelegationTokens(tokens)) }
--- End diff --

`}` goes in next line.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r146436646
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
 ---
@@ -59,8 +59,7 @@ private[yarn] class AMCredentialRenewer(
   private var lastCredentialsFileSuffix = 0
 
   private val credentialRenewer =
-Executors.newSingleThreadScheduledExecutor(
-  ThreadUtils.namedThreadFactory("Credential Refresh Thread"))
+ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh 
Thread")
--- End diff --

Normally you should avoid making changes that are not related to your PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r146434999
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala
 ---
@@ -17,7 +17,7 @@
 
 package org.apache.spark.scheduler.cluster.mesos
 
-import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.SparkContext
--- End diff --

Change not needed?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r146436571
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+/**
+ * The MesosCredentialRenewer will update the Hadoop credentials for Spark 
drivers accessing
+ * secured services using Kerberos authentication. It is modeled after the 
YARN AMCredential
+ * renewer, and similarly will renew the Credentials when 75% of the 
renewal interval has passed.
+ * The principal difference is that instead of writing the new credentials 
to HDFS and
+ * incrementing the timestamp of the file, the new credentials (called 
Tokens when they are
+ * serialized) are broadcast to all running executors. On the executor 
side, when new Tokens are
+ * recieved they overwrite the current credentials.
+ */
+class MesosCredentialRenewer(
+conf: SparkConf,
+tokenManager: HadoopDelegationTokenManager,
+nextRenewal: Long,
+driverEndpoint: RpcEndpointRef) extends Logging {
+  private val credentialRenewerThread =
+ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal 
Thread")
+
+  @volatile private var timeOfNextRenewal = nextRenewal
+
+  private val principal = conf.get(config.PRINCIPAL).orNull
+
+  private val (secretFile, mode) = getSecretFile(conf)
+
+  private def getSecretFile(conf: SparkConf): (String, String) = {
+val keytab = conf.get(config.KEYTAB).orNull
+val tgt = conf.getenv("KRB5CCNAME")
+require(keytab != null || tgt != null, "A keytab or TGT required.")
+// if both Keytab and TGT are detected we use the Keytab.
+val (secretFile, mode) = if (keytab != null && tgt != null) {
+  logWarning(s"Keytab and TGT were detected, using keytab, unset 
$keytab to use TGT")
+  (keytab, "keytab")
+} else {
+  val m = if (keytab != null) "keytab" else "tgt"
+  val sf = if (keytab != null) keytab else tgt
+  (sf, m)
+}
+logInfo(s"Logging in as $principal with mode $mode to retrieve Hadoop 
delegation tokens")
+logDebug(s"secretFile is $secretFile")
+(secretFile, mode)
+  }
+
+  def scheduleTokenRenewal(): Unit = {
+def scheduleRenewal(runnable: Runnable): Unit = {
+  val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
+  if (remainingTime <= 0) {
+logInfo("Credentials have expired, creating new ones now.")
+runnable.run()
+  } else {
+logInfo(s"Scheduling login from keytab in $remainingTime millis.")
+credentialRenewerThread.schedule(runnable, remainingTime, 
TimeUnit.MILLISECONDS)
+  }
+}
+
+val credentialRenewerRunnable =
+  new Runnable {
+override def run(): Unit = {
+  try {
+val creds = getRenewedDelegationTokens(conf)
+broadcastDelegationTokens(creds)
+  } catch {
+case e: Exception =>
+  // Log the error and try to write new tokens back in an hour
+  

[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r146436883
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+/**
+ * The MesosCredentialRenewer will update the Hadoop credentials for Spark 
drivers accessing
+ * secured services using Kerberos authentication. It is modeled after the 
YARN AMCredential
+ * renewer, and similarly will renew the Credentials when 75% of the 
renewal interval has passed.
+ * The principal difference is that instead of writing the new credentials 
to HDFS and
+ * incrementing the timestamp of the file, the new credentials (called 
Tokens when they are
+ * serialized) are broadcast to all running executors. On the executor 
side, when new Tokens are
+ * recieved they overwrite the current credentials.
+ */
+class MesosCredentialRenewer(
+conf: SparkConf,
+tokenManager: HadoopDelegationTokenManager,
+nextRenewal: Long,
+driverEndpoint: RpcEndpointRef) extends Logging {
+  private val credentialRenewerThread =
+ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal 
Thread")
+
+  @volatile private var timeOfNextRenewal = nextRenewal
+
+  private val principal = conf.get(config.PRINCIPAL).orNull
+
+  private val (secretFile, mode) = getSecretFile(conf)
+
+  private def getSecretFile(conf: SparkConf): (String, String) = {
+val keytab = conf.get(config.KEYTAB).orNull
+val tgt = conf.getenv("KRB5CCNAME")
+require(keytab != null || tgt != null, "A keytab or TGT required.")
+// if both Keytab and TGT are detected we use the Keytab.
+val (secretFile, mode) = if (keytab != null && tgt != null) {
+  logWarning(s"Keytab and TGT were detected, using keytab, unset 
$keytab to use TGT")
--- End diff --

`${KEYTAB.key}`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r146437027
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+/**
+ * The MesosCredentialRenewer will update the Hadoop credentials for Spark 
drivers accessing
+ * secured services using Kerberos authentication. It is modeled after the 
YARN AMCredential
+ * renewer, and similarly will renew the Credentials when 75% of the 
renewal interval has passed.
+ * The principal difference is that instead of writing the new credentials 
to HDFS and
+ * incrementing the timestamp of the file, the new credentials (called 
Tokens when they are
+ * serialized) are broadcast to all running executors. On the executor 
side, when new Tokens are
+ * recieved they overwrite the current credentials.
+ */
+class MesosCredentialRenewer(
+conf: SparkConf,
+tokenManager: HadoopDelegationTokenManager,
+nextRenewal: Long,
+driverEndpoint: RpcEndpointRef) extends Logging {
+  private val credentialRenewerThread =
+ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal 
Thread")
+
+  @volatile private var timeOfNextRenewal = nextRenewal
+
+  private val principal = conf.get(config.PRINCIPAL).orNull
+
+  private val (secretFile, mode) = getSecretFile(conf)
+
+  private def getSecretFile(conf: SparkConf): (String, String) = {
+val keytab = conf.get(config.KEYTAB).orNull
+val tgt = conf.getenv("KRB5CCNAME")
+require(keytab != null || tgt != null, "A keytab or TGT required.")
+// if both Keytab and TGT are detected we use the Keytab.
+val (secretFile, mode) = if (keytab != null && tgt != null) {
+  logWarning(s"Keytab and TGT were detected, using keytab, unset 
$keytab to use TGT")
+  (keytab, "keytab")
+} else {
+  val m = if (keytab != null) "keytab" else "tgt"
+  val sf = if (keytab != null) keytab else tgt
+  (sf, m)
+}
+logInfo(s"Logging in as $principal with mode $mode to retrieve Hadoop 
delegation tokens")
+logDebug(s"secretFile is $secretFile")
+(secretFile, mode)
+  }
+
+  def scheduleTokenRenewal(): Unit = {
+def scheduleRenewal(runnable: Runnable): Unit = {
+  val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
+  if (remainingTime <= 0) {
+logInfo("Credentials have expired, creating new ones now.")
+runnable.run()
+  } else {
+logInfo(s"Scheduling login from keytab in $remainingTime millis.")
+credentialRenewerThread.schedule(runnable, remainingTime, 
TimeUnit.MILLISECONDS)
+  }
+}
+
+val credentialRenewerRunnable =
+  new Runnable {
+override def run(): Unit = {
+  try {
+val creds = getRenewedDelegationTokens(conf)
+broadcastDelegationTokens(creds)
+  } catch {
+case e: Exception =>
+  // Log the error and try to write new tokens back in an hour
+  

[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r146436327
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -213,6 +216,24 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
   sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
 )
 
+// check that the credentials are defined, even though it's likely 
that auth would have failed
+// already if you've made it this far
+if (principal != null && currentHadoopDelegationTokens.isDefined) {
+  logDebug(s"Principal found ($principal) starting token renewer")
+  // The renewal time is ignored when getting the initial delegation 
tokens
+  // (CoarseGrainedSchedulerBackend.scala:getHadoopDelegationCreds), 
so we get the renewal
+  // time here and schedule a thread to renew them.
+  val renewalTime =
--- End diff --

I still don't like this. You should not need to implement this separate 
method of getting the renewal time just because the current code is throwing 
out that information. Instead you should fix the code so that the information 
is preserved.

`getHadoopDelegationCreds` is called in only one place, so my suggestion 
would be to encapsulate initializing the token manager and getting the initial 
set of tokens into a single method (instead of the current two).

Then in that method's implementation you can get the initial set of tokens, 
initialize the renewer thread with the correct renewal period, and return the 
data needed by the scheduler.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r146434552
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -194,6 +198,27 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
   sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
 )
 
+// check that the credentials are defined, even though it's likely 
that auth would have failed
+// already if you've made it this far
+if (principal != null && hadoopDelegationCreds.isDefined) {
+  logDebug(s"Principal found ($principal) starting token renewer")
+  val credentialRenewerThread = new Thread {
+setName("MesosCredentialRenewer")
+override def run(): Unit = {
+  val rt = 
MesosCredentialRenewer.getTokenRenewalTime(hadoopDelegationCreds.get, conf)
+  val credentialRenewer =
+new MesosCredentialRenewer(
+  conf,
+  hadoopDelegationTokenManager.get,
+  MesosCredentialRenewer.getNextRenewalTime(rt),
+  driverEndpoint)
+  credentialRenewer.scheduleTokenRenewal()
+}
+  }
+
+  credentialRenewerThread.start()
+  credentialRenewerThread.join()
--- End diff --

That's the gist of it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-20 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r146094019
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -194,6 +198,27 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
   sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
 )
 
+// check that the credentials are defined, even though it's likely 
that auth would have failed
+// already if you've made it this far
+if (principal != null && hadoopDelegationCreds.isDefined) {
+  logDebug(s"Principal found ($principal) starting token renewer")
+  val credentialRenewerThread = new Thread {
+setName("MesosCredentialRenewer")
+override def run(): Unit = {
+  val rt = 
MesosCredentialRenewer.getTokenRenewalTime(hadoopDelegationCreds.get, conf)
+  val credentialRenewer =
+new MesosCredentialRenewer(
+  conf,
+  hadoopDelegationTokenManager.get,
+  MesosCredentialRenewer.getNextRenewalTime(rt),
+  driverEndpoint)
+  credentialRenewer.scheduleTokenRenewal()
+}
+  }
+
+  credentialRenewerThread.start()
+  credentialRenewerThread.join()
--- End diff --

Ok, you're probably right. It appears that the YARN code uses 
`setContextClassLoader(userClassLoader)` whereas in Mesos does not has a notion 
of `userClassLoader`. Therefore we don't need the separate thread in the Mesos 
code. Do I have this correct? Thanks for showing me this!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-20 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r146052502
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -194,6 +198,27 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
   sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
 )
 
+// check that the credentials are defined, even though it's likely 
that auth would have failed
+// already if you've made it this far
+if (principal != null && hadoopDelegationCreds.isDefined) {
+  logDebug(s"Principal found ($principal) starting token renewer")
+  val credentialRenewerThread = new Thread {
+setName("MesosCredentialRenewer")
+override def run(): Unit = {
+  val rt = 
MesosCredentialRenewer.getTokenRenewalTime(hadoopDelegationCreds.get, conf)
+  val credentialRenewer =
+new MesosCredentialRenewer(
+  conf,
+  hadoopDelegationTokenManager.get,
+  MesosCredentialRenewer.getNextRenewalTime(rt),
+  driverEndpoint)
+  credentialRenewer.scheduleTokenRenewal()
+}
+  }
+
+  credentialRenewerThread.start()
+  credentialRenewerThread.join()
--- End diff --

I don't think you really understood why the YARN code needs a thread and 
why I'm telling you this code does not. Read the comment you added here again; 
what makes you think the current thread does not have access to those classes?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-19 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r145861062
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -194,6 +198,27 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
   sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
 )
 
+// check that the credentials are defined, even though it's likely 
that auth would have failed
+// already if you've made it this far
+if (principal != null && hadoopDelegationCreds.isDefined) {
+  logDebug(s"Principal found ($principal) starting token renewer")
+  val credentialRenewerThread = new Thread {
+setName("MesosCredentialRenewer")
+override def run(): Unit = {
+  val rt = 
MesosCredentialRenewer.getTokenRenewalTime(hadoopDelegationCreds.get, conf)
+  val credentialRenewer =
+new MesosCredentialRenewer(
+  conf,
+  hadoopDelegationTokenManager.get,
+  MesosCredentialRenewer.getNextRenewalTime(rt),
+  driverEndpoint)
+  credentialRenewer.scheduleTokenRenewal()
+}
+  }
+
+  credentialRenewerThread.start()
+  credentialRenewerThread.join()
--- End diff --

Yes, sorry, for some reason I understood you you to mean the credential 
renewer itself. I added a comment to the same effect as the YARN analogue. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-18 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r145487542
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -194,6 +198,27 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
   sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
 )
 
+// check that the credentials are defined, even though it's likely 
that auth would have failed
+// already if you've made it this far
+if (principal != null && hadoopDelegationCreds.isDefined) {
+  logDebug(s"Principal found ($principal) starting token renewer")
+  val credentialRenewerThread = new Thread {
+setName("MesosCredentialRenewer")
+override def run(): Unit = {
+  val rt = 
MesosCredentialRenewer.getTokenRenewalTime(hadoopDelegationCreds.get, conf)
+  val credentialRenewer =
+new MesosCredentialRenewer(
+  conf,
+  hadoopDelegationTokenManager.get,
+  MesosCredentialRenewer.getNextRenewalTime(rt),
+  driverEndpoint)
+  credentialRenewer.scheduleTokenRenewal()
+}
+  }
+
+  credentialRenewerThread.start()
+  credentialRenewerThread.join()
--- End diff --

There's a comment explaining why that thread exists right above the code 
you linked. Did you look at it?

Also, you're calling `join()` on the thread, so it's obviously going away.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-17 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r145301221
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -194,6 +198,27 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
   sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
 )
 
+// check that the credentials are defined, even though it's likely 
that auth would have failed
+// already if you've made it this far
+if (principal != null && hadoopDelegationCreds.isDefined) {
+  logDebug(s"Principal found ($principal) starting token renewer")
+  val credentialRenewerThread = new Thread {
+setName("MesosCredentialRenewer")
+override def run(): Unit = {
+  val rt = 
MesosCredentialRenewer.getTokenRenewalTime(hadoopDelegationCreds.get, conf)
+  val credentialRenewer =
+new MesosCredentialRenewer(
+  conf,
+  hadoopDelegationTokenManager.get,
+  MesosCredentialRenewer.getNextRenewalTime(rt),
+  driverEndpoint)
+  credentialRenewer.scheduleTokenRenewal()
+}
+  }
+
+  credentialRenewerThread.start()
+  credentialRenewerThread.join()
--- End diff --

Yes, I believe so. If you look here 
(https://github.com/apache/spark/blob/6735433cde44b3430fb44edfff58ef8149c66c13/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala#L271)
 it's the same pattern. The thread needs to run as long as the application 
driver, correct? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-17 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r145300847
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala
 ---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+class MesosCredentialRenewer(
+conf: SparkConf,
+tokenManager: HadoopDelegationTokenManager,
+nextRenewal: Long,
+de: RpcEndpointRef) extends Logging {
+  private val credentialRenewerThread =
+Executors.newSingleThreadScheduledExecutor(
--- End diff --

I also changed `AMCredentialRenewer` to the same. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-17 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r145298313
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala
 ---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+class MesosCredentialRenewer(
+conf: SparkConf,
+tokenManager: HadoopDelegationTokenManager,
+nextRenewal: Long,
+de: RpcEndpointRef) extends Logging {
--- End diff --

fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r143854246
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala
 ---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+class MesosCredentialRenewer(
+conf: SparkConf,
+tokenManager: HadoopDelegationTokenManager,
+nextRenewal: Long,
+de: RpcEndpointRef) extends Logging {
+  private val credentialRenewerThread =
+Executors.newSingleThreadScheduledExecutor(
+  ThreadUtils.namedThreadFactory("Credential Refresh Thread"))
+
+  @volatile private var timeOfNextRenewal = nextRenewal
+
+  private val principal = conf.get("spark.yarn.principal")
+
+  private val (secretFile, mode) = getSecretFile(conf)
+
+  private def getSecretFile(conf: SparkConf): (String, String) = {
+val keytab64 = conf.get("spark.yarn.keytab", null)
+val tgt64 = System.getenv("KRB5CCNAME")
+require(keytab64 != null || tgt64 != null, "keytab or tgt required")
+require(keytab64 == null || tgt64 == null, "keytab and tgt cannot be 
used at the same time")
--- End diff --

`KRB5CCNAME` is something that people might have in their environment for 
various reasons, so I'd avoid this requirement.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r143855260
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala
 ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+class MesosCredentialRenewer(
+conf: SparkConf,
+tokenManager: HadoopDelegationTokenManager,
+nextRenewal: Long,
+de: RpcEndpointRef) extends Logging {
+  private val credentialRenewerThread =
+Executors.newSingleThreadScheduledExecutor(
+  ThreadUtils.namedThreadFactory("Credential Refresh Thread"))
+
+  @volatile private var timeOfNextRenewal = nextRenewal
+
+  private val principal = conf.get("spark.yarn.principal")
+
+  private val (secretFile, mode) = getSecretFile(conf)
+
+  private def getSecretFile(conf: SparkConf): (String, String) = {
+val keytab64 = conf.get("spark.yarn.keytab", null)
+val tgt64 = System.getenv("KRB5CCNAME")
+require(keytab64 != null || tgt64 != null, "keytab or tgt required")
+require(keytab64 == null || tgt64 == null, "keytab and tgt cannot be 
used at the same time")
+val mode = if (keytab64 != null) "keytab" else "tgt"
+val secretFile = if (keytab64 != null) keytab64 else tgt64
+logInfo(s"Logging in as $principal with mode $mode to retrieve HDFS 
delegation tokens")
+logDebug(s"secretFile is $secretFile")
+(secretFile, mode)
+  }
+
+  def scheduleTokenRenewal(): Unit = {
+def scheduleRenewal(runnable: Runnable): Unit = {
+  val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
+  if (remainingTime <= 0) {
+logInfo("Credentials have expired, creating new ones now.")
+runnable.run()
+  } else {
+logInfo(s"Scheduling login from keytab in $remainingTime millis.")
+credentialRenewerThread.schedule(runnable, remainingTime, 
TimeUnit.MILLISECONDS)
+  }
+}
+
+val credentialRenewerRunnable =
+  new Runnable {
+override def run(): Unit = {
+  try {
+val creds = getRenewedDelegationTokens(conf)
+broadcastDelegationTokens(creds)
+  } catch {
+case e: Exception =>
+  // Log the error and try to write new tokens back in an hour
+  logWarning("Couldn't broadcast tokens, trying again in an 
hour", e)
+  credentialRenewerThread.schedule(this, 1, TimeUnit.HOURS)
+  return
+  }
+  scheduleRenewal(this)
+}
+  }
+scheduleRenewal(credentialRenewerRunnable)
+  }
+
+  private def getRenewedDelegationTokens(conf: SparkConf): Array[Byte] = {
+logInfo(s"Attempting to login with ${conf.get("spark.yarn.principal", 
null)}")
+// Get new delegation tokens by logging in with a new UGI
+// inspired by AMCredentialRenewer.scala:L174
+val ugi = if (mode == "keytab") {
--- End diff --

The correct way would be for the credential management code to 
differentiate between token creation and token renewal; that way it would renew 
tokens at the renewal internal and create new ones after the max 

[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r143835301
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
---
@@ -142,6 +142,13 @@ class SparkHadoopUtil extends Logging {
 }
   }
 
+  def addDelegationTokens(tokens: Array[Byte], sparkConf: SparkConf) {
--- End diff --

Add a comment about what this method is doing and why it's needed. (YARN 
never sets the authentication method, so it'd be good to know why Mesos needs 
to do it.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r143838652
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala
 ---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+class MesosCredentialRenewer(
+conf: SparkConf,
+tokenManager: HadoopDelegationTokenManager,
+nextRenewal: Long,
+de: RpcEndpointRef) extends Logging {
+  private val credentialRenewerThread =
+Executors.newSingleThreadScheduledExecutor(
+  ThreadUtils.namedThreadFactory("Credential Refresh Thread"))
+
+  @volatile private var timeOfNextRenewal = nextRenewal
+
+  private val principal = conf.get("spark.yarn.principal")
--- End diff --

Use the `PRINCIPAL` constant.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r143836011
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -60,6 +62,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   override def hadoopDelegationTokenManager: 
Option[HadoopDelegationTokenManager] =
 Some(new HadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration))
 
+  private val principal = conf.get("spark.yarn.principal", null)
--- End diff --

This config is defined in core already (`PRINCIPAL`).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r143839588
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala
 ---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+class MesosCredentialRenewer(
+conf: SparkConf,
+tokenManager: HadoopDelegationTokenManager,
+nextRenewal: Long,
+de: RpcEndpointRef) extends Logging {
+  private val credentialRenewerThread =
+Executors.newSingleThreadScheduledExecutor(
+  ThreadUtils.namedThreadFactory("Credential Refresh Thread"))
+
+  @volatile private var timeOfNextRenewal = nextRenewal
+
+  private val principal = conf.get("spark.yarn.principal")
+
+  private val (secretFile, mode) = getSecretFile(conf)
+
+  private def getSecretFile(conf: SparkConf): (String, String) = {
+val keytab64 = conf.get("spark.yarn.keytab", null)
+val tgt64 = System.getenv("KRB5CCNAME")
--- End diff --

`64`?

Also, using `conf.getenv` would allow tests to be written.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r143835387
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 ---
@@ -123,6 +123,11 @@ private[spark] class CoarseGrainedExecutorBackend(
   executor.stop()
 }
   }.start()
+
+// This message is only sent by Mesos Drivers, and is not expected 
from other
--- End diff --

No need to add this comment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r143836825
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -194,6 +198,27 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
   sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
 )
 
+// check that the credentials are defined, even though it's likely 
that auth would have failed
+// already if you've made it this far
+if (principal != null && hadoopDelegationCreds.isDefined) {
+  logDebug(s"Principal found ($principal) starting token renewer")
+  val credentialRenewerThread = new Thread {
+setName("MesosCredentialRenewer")
+override def run(): Unit = {
+  val rt = 
MesosCredentialRenewer.getTokenRenewalTime(hadoopDelegationCreds.get, conf)
+  val credentialRenewer =
+new MesosCredentialRenewer(
+  conf,
+  hadoopDelegationTokenManager.get,
+  MesosCredentialRenewer.getNextRenewalTime(rt),
+  driverEndpoint)
+  credentialRenewer.scheduleTokenRenewal()
+}
+  }
+
+  credentialRenewerThread.start()
+  credentialRenewerThread.join()
--- End diff --

Why do you need this thread if it's going to be short-lived?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r143857596
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala
 ---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+class MesosCredentialRenewer(
+conf: SparkConf,
+tokenManager: HadoopDelegationTokenManager,
+nextRenewal: Long,
+de: RpcEndpointRef) extends Logging {
--- End diff --

How about a more descriptive variable name?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r143838709
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala
 ---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+class MesosCredentialRenewer(
+conf: SparkConf,
+tokenManager: HadoopDelegationTokenManager,
+nextRenewal: Long,
+de: RpcEndpointRef) extends Logging {
+  private val credentialRenewerThread =
+Executors.newSingleThreadScheduledExecutor(
+  ThreadUtils.namedThreadFactory("Credential Refresh Thread"))
+
+  @volatile private var timeOfNextRenewal = nextRenewal
+
+  private val principal = conf.get("spark.yarn.principal")
+
+  private val (secretFile, mode) = getSecretFile(conf)
+
+  private def getSecretFile(conf: SparkConf): (String, String) = {
+val keytab64 = conf.get("spark.yarn.keytab", null)
--- End diff --

Similarly there's a `KEYTAB` constant. Also why `64`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r143835719
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -159,6 +159,11 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 scheduler.getExecutorsAliveOnHost(host).foreach { exec =>
   killExecutors(exec.toSeq, replace = true, force = true)
 }
+
+  case UpdateDelegationTokens(tokens) =>
+executorDataMap.values.foreach {
+  ed => ed.executorEndpoint.send(UpdateDelegationTokens(tokens))
--- End diff --

`ed =>` goes in previous line. (The whole thing might fit in one line, too.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r143837225
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala
 ---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+class MesosCredentialRenewer(
+conf: SparkConf,
+tokenManager: HadoopDelegationTokenManager,
+nextRenewal: Long,
+de: RpcEndpointRef) extends Logging {
+  private val credentialRenewerThread =
+Executors.newSingleThreadScheduledExecutor(
--- End diff --

`ThreadUtils.newDaemonSingleThreadScheduledExecutor`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-10 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r143856998
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -194,6 +198,27 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
   sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
 )
 
+// check that the credentials are defined, even though it's likely 
that auth would have failed
+// already if you've made it this far
+if (principal != null && hadoopDelegationCreds.isDefined) {
+  logDebug(s"Principal found ($principal) starting token renewer")
+  val credentialRenewerThread = new Thread {
+setName("MesosCredentialRenewer")
+override def run(): Unit = {
+  val rt = 
MesosCredentialRenewer.getTokenRenewalTime(hadoopDelegationCreds.get, conf)
--- End diff --

So, you need this because `hadoopDelegationCreds` doesn't keep the 
information about when the tokens should be renewed (a.k.a. the return value of 
`obtainDelegationTokens`). Perhaps some minor refactoring would help clean this 
up?

In fact, `hadoopDelegationCreds` is a `val`, so any executors that start 
after the initial token set expires will fail, no? Because they'll fetch 
`hadoopDelegationCreds` from the driver, and won't get the 
`UpdateDelegationTokens` until it's way too late.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-09-25 Thread kalvinnchau
Github user kalvinnchau commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r140830852
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -194,6 +198,26 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
   sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
 )
 
+// check that the credentials are defined, even though it's likely 
that auth would have failed
+// already if you've made it this far
+if (principal != null && hadoopDelegationCreds.isDefined) {
+  logDebug(s"Principal found ($principal) starting token renewer")
+  val credentialRenewerThread = new Thread {
+setName("MesosCredentialRenewer")
+override def run(): Unit = {
+  val credentialRenewer =
+new MesosCredentialRenewer(
+  conf,
+  hadoopDelegationTokenManager.get,
+  
MesosCredentialRenewer.getTokenRenewalTime(hadoopDelegationCreds.get, conf),
--- End diff --

This sets the first renewal time to be the expiration time of the token.

It should be similar to the way next renewal time in the 
MesosCredentialRenewer class is calculated so that it renews the first token 
after 75% of expiration time has passed:

```scala
val currTime = System.currentTimeMillis()
val renewTime = 
MesosCredentialRenewer.getTokenRenewalTime(hadoopDelegationCreds.get, conf)
val rt = 0.75 * (renewTime - currTime)

val credentialRenewer =
   new MesosCredentialRenewer(
 conf,
 hadoopDelegationTokenManager.get,
 (currTime + rt).toLong,
 driverEndpoint)
credentialRenewer.scheduleTokenRenewal()
```



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-09-22 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r140625136
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala
 ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+class MesosCredentialRenewer(
+conf: SparkConf,
+tokenManager: HadoopDelegationTokenManager,
+nextRenewal: Long,
+de: RpcEndpointRef) extends Logging {
+  private val credentialRenewerThread =
+Executors.newSingleThreadScheduledExecutor(
+  ThreadUtils.namedThreadFactory("Credential Refresh Thread"))
+
+  @volatile private var timeOfNextRenewal = nextRenewal
+
+  private val principal = conf.get("spark.yarn.principal")
+
+  private val (secretFile, mode) = getSecretFile(conf)
+
+  private def getSecretFile(conf: SparkConf): (String, String) = {
+val keytab64 = conf.get("spark.yarn.keytab", null)
+val tgt64 = System.getenv("KRB5CCNAME")
+require(keytab64 != null || tgt64 != null, "keytab or tgt required")
+require(keytab64 == null || tgt64 == null, "keytab and tgt cannot be 
used at the same time")
+val mode = if (keytab64 != null) "keytab" else "tgt"
+val secretFile = if (keytab64 != null) keytab64 else tgt64
+logInfo(s"Logging in as $principal with mode $mode to retrieve HDFS 
delegation tokens")
+logDebug(s"secretFile is $secretFile")
+(secretFile, mode)
+  }
+
+  def scheduleTokenRenewal(): Unit = {
+def scheduleRenewal(runnable: Runnable): Unit = {
+  val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
+  if (remainingTime <= 0) {
+logInfo("Credentials have expired, creating new ones now.")
+runnable.run()
+  } else {
+logInfo(s"Scheduling login from keytab in $remainingTime millis.")
+credentialRenewerThread.schedule(runnable, remainingTime, 
TimeUnit.MILLISECONDS)
+  }
+}
+
+val credentialRenewerRunnable =
+  new Runnable {
+override def run(): Unit = {
+  try {
+val creds = getRenewedDelegationTokens(conf)
+broadcastDelegationTokens(creds)
+  } catch {
+case e: Exception =>
+  // Log the error and try to write new tokens back in an hour
+  logWarning("Couldn't broadcast tokens, trying again in an 
hour", e)
+  credentialRenewerThread.schedule(this, 1, TimeUnit.HOURS)
+  return
+  }
+  scheduleRenewal(this)
+}
+  }
+scheduleRenewal(credentialRenewerRunnable)
+  }
+
+  private def getRenewedDelegationTokens(conf: SparkConf): Array[Byte] = {
+logInfo(s"Attempting to login with ${conf.get("spark.yarn.principal", 
null)}")
+// Get new delegation tokens by logging in with a new UGI
+// inspired by AMCredentialRenewer.scala:L174
+val ugi = if (mode == "keytab") {
--- End diff --

Hello @kalvinnchau You are correct, all this does is keep track of when the 
tokens will expire and renew them at that time. Part of my motivation for doing 
this is to avoid writing any files to disk (like new 

[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-09-21 Thread kalvinnchau
Github user kalvinnchau commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r140326376
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala
 ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+class MesosCredentialRenewer(
+conf: SparkConf,
+tokenManager: HadoopDelegationTokenManager,
+nextRenewal: Long,
+de: RpcEndpointRef) extends Logging {
+  private val credentialRenewerThread =
+Executors.newSingleThreadScheduledExecutor(
+  ThreadUtils.namedThreadFactory("Credential Refresh Thread"))
+
+  @volatile private var timeOfNextRenewal = nextRenewal
+
+  private val principal = conf.get("spark.yarn.principal")
+
+  private val (secretFile, mode) = getSecretFile(conf)
+
+  private def getSecretFile(conf: SparkConf): (String, String) = {
+val keytab64 = conf.get("spark.yarn.keytab", null)
+val tgt64 = System.getenv("KRB5CCNAME")
+require(keytab64 != null || tgt64 != null, "keytab or tgt required")
+require(keytab64 == null || tgt64 == null, "keytab and tgt cannot be 
used at the same time")
+val mode = if (keytab64 != null) "keytab" else "tgt"
+val secretFile = if (keytab64 != null) keytab64 else tgt64
+logInfo(s"Logging in as $principal with mode $mode to retrieve HDFS 
delegation tokens")
+logDebug(s"secretFile is $secretFile")
+(secretFile, mode)
+  }
+
+  def scheduleTokenRenewal(): Unit = {
+def scheduleRenewal(runnable: Runnable): Unit = {
+  val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
+  if (remainingTime <= 0) {
+logInfo("Credentials have expired, creating new ones now.")
+runnable.run()
+  } else {
+logInfo(s"Scheduling login from keytab in $remainingTime millis.")
+credentialRenewerThread.schedule(runnable, remainingTime, 
TimeUnit.MILLISECONDS)
+  }
+}
+
+val credentialRenewerRunnable =
+  new Runnable {
+override def run(): Unit = {
+  try {
+val creds = getRenewedDelegationTokens(conf)
+broadcastDelegationTokens(creds)
+  } catch {
+case e: Exception =>
+  // Log the error and try to write new tokens back in an hour
+  logWarning("Couldn't broadcast tokens, trying again in an 
hour", e)
+  credentialRenewerThread.schedule(this, 1, TimeUnit.HOURS)
+  return
+  }
+  scheduleRenewal(this)
+}
+  }
+scheduleRenewal(credentialRenewerRunnable)
+  }
+
+  private def getRenewedDelegationTokens(conf: SparkConf): Array[Byte] = {
+logInfo(s"Attempting to login with ${conf.get("spark.yarn.principal", 
null)}")
+// Get new delegation tokens by logging in with a new UGI
+// inspired by AMCredentialRenewer.scala:L174
+val ugi = if (mode == "keytab") {
--- End diff --

I don't see where it refreshes the delegation tokens until the 
max-lifetime, then re-login with the keytab to get a new delegation tokens 
that'll last until the max-lifetime.

Does this skip over 

[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-09-20 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r140118143
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala
 ---
@@ -63,7 +63,8 @@ class MesosCredentialRenewer(
 
   def scheduleTokenRenewal(): Unit = {
 def scheduleRenewal(runnable: Runnable): Unit = {
-  val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
+  // val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
+  val remainingTime = 5000
--- End diff --

well that's embarrassing, just a debugging tool that I forgot to remove.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-09-20 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r140117834
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -198,16 +198,19 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
   sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
 )
 
-if (principal != null) {
+// check that the credentials are defined, even though it's likely 
that auth would have failed
+// already if you've made it this far
+if (principal != null && hadoopDelegationCreds.isDefined) {
   logDebug(s"Principal found ($principal) starting token renewer")
   val credentialRenewerThread = new Thread {
 setName("MesosCredentialRenewer")
 override def run(): Unit = {
+  val dummy: Option[Array[Byte]] = None
--- End diff --

whoops! 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-09-20 Thread susanxhuynh
Github user susanxhuynh commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r140117253
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala
 ---
@@ -63,7 +63,8 @@ class MesosCredentialRenewer(
 
   def scheduleTokenRenewal(): Unit = {
 def scheduleRenewal(runnable: Runnable): Unit = {
-  val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
+  // val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
+  val remainingTime = 5000
--- End diff --

Why 5000?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-09-20 Thread susanxhuynh
Github user susanxhuynh commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r140117055
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -198,16 +198,19 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
   sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
 )
 
-if (principal != null) {
+// check that the credentials are defined, even though it's likely 
that auth would have failed
+// already if you've made it this far
+if (principal != null && hadoopDelegationCreds.isDefined) {
   logDebug(s"Principal found ($principal) starting token renewer")
   val credentialRenewerThread = new Thread {
 setName("MesosCredentialRenewer")
 override def run(): Unit = {
+  val dummy: Option[Array[Byte]] = None
--- End diff --

What is this for?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-09-20 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r140088442
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala
 ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+class MesosCredentialRenewer(
+conf: SparkConf,
+tokenManager: HadoopDelegationTokenManager,
+nextRenewal: Long,
+de: RpcEndpointRef) extends Logging {
+  private val credentialRenewerThread =
+Executors.newSingleThreadScheduledExecutor(
+  ThreadUtils.namedThreadFactory("Credential Refresh Thread"))
+
+  @volatile private var timeOfNextRenewal = nextRenewal
+
+  private val principal = conf.get("spark.yarn.principal")
+
+  private val (secretFile, mode) = getSecretFile(conf)
+
+  private def getSecretFile(conf: SparkConf): (String, String) = {
+val keytab64 = conf.get("spark.yarn.keytab", null)
+val tgt64 = System.getenv("KRB5CCNAME")
+require(keytab64 != null || tgt64 != null, "keytab or tgt required")
+require(keytab64 == null || tgt64 == null, "keytab and tgt cannot be 
used at the same time")
+val mode = if (keytab64 != null) "keytab" else "tgt"
+val secretFile = if (keytab64 != null) keytab64 else tgt64
+logInfo(s"Logging in as $principal with mode $mode to retrieve HDFS 
delegation tokens")
+logDebug(s"secretFile is $secretFile")
+(secretFile, mode)
+  }
+
+  def scheduleTokenRenewal(): Unit = {
+def scheduleRenewal(runnable: Runnable): Unit = {
+  val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
+  if (remainingTime <= 0) {
+logInfo("Credentials have expired, creating new ones now.")
+runnable.run()
+  } else {
+logInfo(s"Scheduling login from keytab in $remainingTime millis.")
+credentialRenewerThread.schedule(runnable, remainingTime, 
TimeUnit.MILLISECONDS)
+  }
+}
+
+val credentialRenewerRunnable =
+  new Runnable {
+override def run(): Unit = {
+  try {
+val creds = getRenewedDelegationTokens(conf)
+broadcastDelegationTokens(creds)
+  } catch {
+case e: Exception =>
+  // Log the error and try to write new tokens back in an hour
+  logWarning("Couldn't broadcast tokens, trying agin in 20 
seconds", e)
+  credentialRenewerThread.schedule(this, 20, TimeUnit.SECONDS)
+  return
+  }
+  scheduleRenewal(this)
+}
+  }
+scheduleRenewal(credentialRenewerRunnable)
+  }
+
+  private def getRenewedDelegationTokens(conf: SparkConf): Array[Byte] = {
+logInfo(s"Attempting to login with ${conf.get("spark.yarn.principal", 
null)}")
+// Get new delegation tokens by logging in with a new UGI
+// inspired by AMCredentialRenewer.scala:L174
+val ugi = if (mode == "keytab") {
+  UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, 
secretFile)
+} else {
+  UserGroupInformation.getUGIFromTicketCache(secretFile, principal)
+}
+val tempCreds = 

[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-09-20 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r140078733
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala
 ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+class MesosCredentialRenewer(
+conf: SparkConf,
+tokenManager: HadoopDelegationTokenManager,
+nextRenewal: Long,
+de: RpcEndpointRef) extends Logging {
+  private val credentialRenewerThread =
+Executors.newSingleThreadScheduledExecutor(
+  ThreadUtils.namedThreadFactory("Credential Refresh Thread"))
+
+  @volatile private var timeOfNextRenewal = nextRenewal
+
+  private val principal = conf.get("spark.yarn.principal")
+
+  private val (secretFile, mode) = getSecretFile(conf)
+
+  private def getSecretFile(conf: SparkConf): (String, String) = {
+val keytab64 = conf.get("spark.yarn.keytab", null)
+val tgt64 = System.getenv("KRB5CCNAME")
+require(keytab64 != null || tgt64 != null, "keytab or tgt required")
+require(keytab64 == null || tgt64 == null, "keytab and tgt cannot be 
used at the same time")
+val mode = if (keytab64 != null) "keytab" else "tgt"
+val secretFile = if (keytab64 != null) keytab64 else tgt64
+logInfo(s"Logging in as $principal with mode $mode to retrieve HDFS 
delegation tokens")
+logDebug(s"secretFile is $secretFile")
+(secretFile, mode)
+  }
+
+  def scheduleTokenRenewal(): Unit = {
+def scheduleRenewal(runnable: Runnable): Unit = {
+  val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
+  if (remainingTime <= 0) {
+logInfo("Credentials have expired, creating new ones now.")
+runnable.run()
+  } else {
+logInfo(s"Scheduling login from keytab in $remainingTime millis.")
+credentialRenewerThread.schedule(runnable, remainingTime, 
TimeUnit.MILLISECONDS)
+  }
+}
+
+val credentialRenewerRunnable =
+  new Runnable {
+override def run(): Unit = {
+  try {
+val creds = getRenewedDelegationTokens(conf)
+broadcastDelegationTokens(creds)
+  } catch {
+case e: Exception =>
+  // Log the error and try to write new tokens back in an hour
--- End diff --

good catch, I changed the code to match the YARN equivalent. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-09-19 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r139847070
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala
 ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+class MesosCredentialRenewer(
+conf: SparkConf,
+tokenManager: HadoopDelegationTokenManager,
+nextRenewal: Long,
+de: RpcEndpointRef) extends Logging {
+  private val credentialRenewerThread =
+Executors.newSingleThreadScheduledExecutor(
+  ThreadUtils.namedThreadFactory("Credential Refresh Thread"))
+
+  @volatile private var timeOfNextRenewal = nextRenewal
+
+  private val principal = conf.get("spark.yarn.principal")
+
+  private val (secretFile, mode) = getSecretFile(conf)
+
+  private def getSecretFile(conf: SparkConf): (String, String) = {
+val keytab64 = conf.get("spark.yarn.keytab", null)
+val tgt64 = System.getenv("KRB5CCNAME")
+require(keytab64 != null || tgt64 != null, "keytab or tgt required")
+require(keytab64 == null || tgt64 == null, "keytab and tgt cannot be 
used at the same time")
+val mode = if (keytab64 != null) "keytab" else "tgt"
+val secretFile = if (keytab64 != null) keytab64 else tgt64
+logInfo(s"Logging in as $principal with mode $mode to retrieve HDFS 
delegation tokens")
+logDebug(s"secretFile is $secretFile")
+(secretFile, mode)
+  }
+
+  def scheduleTokenRenewal(): Unit = {
+def scheduleRenewal(runnable: Runnable): Unit = {
+  val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
+  if (remainingTime <= 0) {
+logInfo("Credentials have expired, creating new ones now.")
+runnable.run()
+  } else {
+logInfo(s"Scheduling login from keytab in $remainingTime millis.")
+credentialRenewerThread.schedule(runnable, remainingTime, 
TimeUnit.MILLISECONDS)
+  }
+}
+
+val credentialRenewerRunnable =
+  new Runnable {
+override def run(): Unit = {
+  try {
+val creds = getRenewedDelegationTokens(conf)
+broadcastDelegationTokens(creds)
+  } catch {
+case e: Exception =>
+  // Log the error and try to write new tokens back in an hour
+  logWarning("Couldn't broadcast tokens, trying agin in 20 
seconds", e)
+  credentialRenewerThread.schedule(this, 20, TimeUnit.SECONDS)
+  return
+  }
+  scheduleRenewal(this)
+}
+  }
+scheduleRenewal(credentialRenewerRunnable)
+  }
+
+  private def getRenewedDelegationTokens(conf: SparkConf): Array[Byte] = {
+logInfo(s"Attempting to login with ${conf.get("spark.yarn.principal", 
null)}")
+// Get new delegation tokens by logging in with a new UGI
+// inspired by AMCredentialRenewer.scala:L174
+val ugi = if (mode == "keytab") {
+  UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, 
secretFile)
+} else {
+  UserGroupInformation.getUGIFromTicketCache(secretFile, principal)
+}
+val tempCreds = 

[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-09-19 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r139841458
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -159,6 +159,13 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 scheduler.getExecutorsAliveOnHost(host).foreach { exec =>
   killExecutors(exec.toSeq, replace = true, force = true)
 }
+
+  case UpdateDelegationTokens(tokens) =>
+logDebug("Asking each executor to update HDFS delegation tokens")
+for ((x, executorData) <- executorDataMap) {
--- End diff --

Alternatively executorDataMap.values.foreach(...)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-09-19 Thread susanxhuynh
Github user susanxhuynh commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r139772286
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala
 ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+class MesosCredentialRenewer(
+conf: SparkConf,
+tokenManager: HadoopDelegationTokenManager,
+nextRenewal: Long,
+de: RpcEndpointRef) extends Logging {
+  private val credentialRenewerThread =
+Executors.newSingleThreadScheduledExecutor(
+  ThreadUtils.namedThreadFactory("Credential Refresh Thread"))
+
+  @volatile private var timeOfNextRenewal = nextRenewal
+
+  private val principal = conf.get("spark.yarn.principal")
+
+  private val (secretFile, mode) = getSecretFile(conf)
+
+  private def getSecretFile(conf: SparkConf): (String, String) = {
+val keytab64 = conf.get("spark.yarn.keytab", null)
+val tgt64 = System.getenv("KRB5CCNAME")
+require(keytab64 != null || tgt64 != null, "keytab or tgt required")
+require(keytab64 == null || tgt64 == null, "keytab and tgt cannot be 
used at the same time")
+val mode = if (keytab64 != null) "keytab" else "tgt"
+val secretFile = if (keytab64 != null) keytab64 else tgt64
+logInfo(s"Logging in as $principal with mode $mode to retrieve HDFS 
delegation tokens")
+logDebug(s"secretFile is $secretFile")
+(secretFile, mode)
+  }
+
+  def scheduleTokenRenewal(): Unit = {
+def scheduleRenewal(runnable: Runnable): Unit = {
+  val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
+  if (remainingTime <= 0) {
+logInfo("Credentials have expired, creating new ones now.")
+runnable.run()
+  } else {
+logInfo(s"Scheduling login from keytab in $remainingTime millis.")
+credentialRenewerThread.schedule(runnable, remainingTime, 
TimeUnit.MILLISECONDS)
+  }
+}
+
+val credentialRenewerRunnable =
+  new Runnable {
+override def run(): Unit = {
+  try {
+val creds = getRenewedDelegationTokens(conf)
+broadcastDelegationTokens(creds)
+  } catch {
+case e: Exception =>
+  // Log the error and try to write new tokens back in an hour
--- End diff --

Comment says "an hour" but code has 20 seconds.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-09-19 Thread susanxhuynh
Github user susanxhuynh commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r139726573
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala
 ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+class MesosCredentialRenewer(
+conf: SparkConf,
+tokenManager: HadoopDelegationTokenManager,
+nextRenewal: Long,
+de: RpcEndpointRef) extends Logging {
+  private val credentialRenewerThread =
+Executors.newSingleThreadScheduledExecutor(
+  ThreadUtils.namedThreadFactory("Credential Refresh Thread"))
+
+  @volatile private var timeOfNextRenewal = nextRenewal
+
+  private val principal = conf.get("spark.yarn.principal")
+
+  private val (secretFile, mode) = getSecretFile(conf)
+
+  private def getSecretFile(conf: SparkConf): (String, String) = {
+val keytab64 = conf.get("spark.yarn.keytab", null)
+val tgt64 = System.getenv("KRB5CCNAME")
+require(keytab64 != null || tgt64 != null, "keytab or tgt required")
+require(keytab64 == null || tgt64 == null, "keytab and tgt cannot be 
used at the same time")
+val mode = if (keytab64 != null) "keytab" else "tgt"
+val secretFile = if (keytab64 != null) keytab64 else tgt64
+logInfo(s"Logging in as $principal with mode $mode to retrieve HDFS 
delegation tokens")
+logDebug(s"secretFile is $secretFile")
+(secretFile, mode)
+  }
+
+  def scheduleTokenRenewal(): Unit = {
+def scheduleRenewal(runnable: Runnable): Unit = {
+  val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
+  if (remainingTime <= 0) {
+logInfo("Credentials have expired, creating new ones now.")
+runnable.run()
+  } else {
+logInfo(s"Scheduling login from keytab in $remainingTime millis.")
+credentialRenewerThread.schedule(runnable, remainingTime, 
TimeUnit.MILLISECONDS)
+  }
+}
+
+val credentialRenewerRunnable =
+  new Runnable {
+override def run(): Unit = {
+  try {
+val creds = getRenewedDelegationTokens(conf)
+broadcastDelegationTokens(creds)
+  } catch {
+case e: Exception =>
+  // Log the error and try to write new tokens back in an hour
+  logWarning("Couldn't broadcast tokens, trying agin in 20 
seconds", e)
--- End diff --

(sp) "ag**a**in"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-09-19 Thread susanxhuynh
Github user susanxhuynh commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r139779444
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -159,6 +159,13 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 scheduler.getExecutorsAliveOnHost(host).foreach { exec =>
   killExecutors(exec.toSeq, replace = true, force = true)
 }
+
+  case UpdateDelegationTokens(tokens) =>
+logDebug("Asking each executor to update HDFS delegation tokens")
+for ((x, executorData) <- executorDataMap) {
--- End diff --

`(_, executorData)` would be more Scala-like.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-09-19 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r139642261
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -194,6 +198,24 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
   sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
 )
 
+if (principal != null) {
+  logDebug(s"Principal found ($principal) starting token renewer")
+  val credentialRenewerThread = new Thread {
+setName("MesosCredentialRenewer")
+override def run(): Unit = {
+  val credentialRenewer =
+new MesosCredentialRenewer(
+  conf,
+  hadoopDelegationTokenManager.get,
+  
MesosCredentialRenewer.getTokenRenewalInterval(hadoopDelegationCreds.get, conf),
--- End diff --

hadoopDelegationCreds.get. Should we check against none? Creds are loaded 
in CoarseGrainedSchedulerBackend but if they are missing we should fail here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-09-19 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r139640414
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala
 ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+class MesosCredentialRenewer(
+conf: SparkConf,
+tokenManager: HadoopDelegationTokenManager,
+nextRenewal: Long,
+de: RpcEndpointRef) extends Logging {
+  private val credentialRenewerThread =
+Executors.newSingleThreadScheduledExecutor(
+  ThreadUtils.namedThreadFactory("Credential Refresh Thread"))
+
+  @volatile private var timeOfNextRenewal = nextRenewal
+
+  private val principal = conf.get("spark.yarn.principal")
+
+  private val (secretFile, mode) = getSecretFile(conf)
+
+  private def getSecretFile(conf: SparkConf): (String, String) = {
+val keytab64 = conf.get("spark.yarn.keytab", null)
+val tgt64 = System.getenv("KRB5CCNAME")
+require(keytab64 != null || tgt64 != null, "keytab or tgt required")
+require(keytab64 == null || tgt64 == null, "keytab and tgt cannot be 
used at the same time")
+val mode = if (keytab64 != null) "keytab" else "tgt"
+val secretFile = if (keytab64 != null) keytab64 else tgt64
+logInfo(s"Logging in as $principal with mode $mode to retrieve HDFS 
delegation tokens")
+logDebug(s"secretFile is $secretFile")
+(secretFile, mode)
+  }
+
+  def scheduleTokenRenewal(): Unit = {
+def scheduleRenewal(runnable: Runnable): Unit = {
+  val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
+  if (remainingTime <= 0) {
+logInfo("Credentials have expired, creating new ones now.")
+runnable.run()
+  } else {
+logInfo(s"Scheduling login from keytab in $remainingTime millis.")
+credentialRenewerThread.schedule(runnable, remainingTime, 
TimeUnit.MILLISECONDS)
+  }
+}
+
+val credentialRenewerRunnable =
+  new Runnable {
+override def run(): Unit = {
+  try {
+val creds = getRenewedDelegationTokens(conf)
+broadcastDelegationTokens(creds)
+  } catch {
+case e: Exception =>
+  // Log the error and try to write new tokens back in an hour
+  logWarning("Couldn't broadcast tokens, trying agin in 20 
seconds", e)
+  credentialRenewerThread.schedule(this, 20, TimeUnit.SECONDS)
+  return
+  }
+  scheduleRenewal(this)
+}
+  }
+scheduleRenewal(credentialRenewerRunnable)
+  }
+
+  private def getRenewedDelegationTokens(conf: SparkConf): Array[Byte] = {
+logInfo(s"Attempting to login with ${conf.get("spark.yarn.principal", 
null)}")
+// Get new delegation tokens by logging in with a new UGI
+// inspired by AMCredentialRenewer.scala:L174
+val ugi = if (mode == "keytab") {
+  UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, 
secretFile)
+} else {
+  UserGroupInformation.getUGIFromTicketCache(secretFile, principal)
+}
+val tempCreds = 

[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-09-19 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r139637343
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -159,6 +159,13 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 scheduler.getExecutorsAliveOnHost(host).foreach { exec =>
   killExecutors(exec.toSeq, replace = true, force = true)
 }
+
+  case UpdateDelegationTokens(tokens) =>
+logDebug("Asking each executor to update HDFS delegation tokens")
+for ((x, executorData) <- executorDataMap) {
+  
executorData.executorEndpoint.send(UpdateDelegationTokens(tokens))
+}
+
--- End diff --

remove space


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-09-19 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r139636800
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 ---
@@ -174,6 +178,16 @@ private[spark] class CoarseGrainedExecutorBackend(
 
 private[spark] object CoarseGrainedExecutorBackend extends Logging {
 
+  private def addDelegationTokens(tokens: Array[Byte], sparkConf: 
SparkConf) {
--- End diff --

Should we move this to SparkHadoopUtil and re-use 
methods such as: 
https://github.com/apache/spark/blob/d7b1fcf8f0a267322af0592b2cb31f1c8970fb16/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala#L131
 ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >