[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

2018-03-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20657


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

2018-03-19 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20657#discussion_r175638637
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
 ---
@@ -105,7 +105,8 @@ private[spark] class MesosHadoopDelegationTokenManager(
 case e: Exception =>
   // Log the error and try to write new tokens back in an hour
   logWarning("Couldn't broadcast tokens, trying again in an 
hour", e)
--- End diff --

Shall we update the log to reflect the configured waiting hour.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

2018-03-09 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20657#discussion_r173522390
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
 ---
@@ -18,221 +18,160 @@ package org.apache.spark.deploy.yarn.security
 
 import java.security.PrivilegedExceptionAction
 import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.AtomicReference
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.security.HadoopDelegationTokenManager
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.ui.UIUtils
 import org.apache.spark.util.ThreadUtils
 
 /**
- * The following methods are primarily meant to make sure long-running 
apps like Spark
- * Streaming apps can run without interruption while accessing secured 
services. The
- * scheduleLoginFromKeytab method is called on the AM to get the new 
credentials.
- * This method wakes up a thread that logs into the KDC
- * once 75% of the renewal interval of the original credentials used for 
the container
- * has elapsed. It then obtains new credentials and writes them to HDFS in 
a
- * pre-specified location - the prefix of which is specified in the 
sparkConf by
- * spark.yarn.credentials.file (so the file(s) would be named 
c-timestamp1-1, c-timestamp2-2 etc.
- * - each update goes to a new file, with a monotonically increasing 
suffix), also the
- * timestamp1, timestamp2 here indicates the time of next update for 
CredentialUpdater.
- * After this, the credentials are renewed once 75% of the new tokens 
renewal interval has elapsed.
+ * A manager tasked with periodically updating delegation tokens needed by 
the application.
  *
- * On the executor and driver (yarn client mode) side, the 
updateCredentialsIfRequired method is
- * called once 80% of the validity of the original credentials has 
elapsed. At that time the
- * executor finds the credentials file with the latest timestamp and 
checks if it has read those
- * credentials before (by keeping track of the suffix of the last file it 
read). If a new file has
- * appeared, it will read the credentials and update the currently running 
UGI with it. This
- * process happens again once 80% of the validity of this has expired.
+ * This manager is meant to make sure long-running apps (such as Spark 
Streaming apps) can run
+ * without interruption while accessing secured services. It periodically 
logs in to the KDC with
+ * user-provided credentials, and contacts all the configured secure 
services to obtain delegation
+ * tokens to be distributed to the rest of the application.
+ *
+ * This class will manage the kerberos login, by renewing the TGT when 
needed. Because the UGI API
+ * does not expose the TTL of the TGT, a configuration controls how often 
to check that a relogin is
+ * necessary. This is done reasonably often since the check is a no-op 
when the relogin is not yet
+ * needed. The check period can be overridden in the configuration.
+ *
+ * New delegation tokens are created once 75% of the renewal interval of 
the original tokens has
+ * elapsed. The new tokens are sent to the Spark driver endpoint once it's 
registered with the AM.
+ * The driver is tasked with distributing the tokens to other processes 
that might need them.
  */
 private[yarn] class AMCredentialRenewer(
 sparkConf: SparkConf,
-hadoopConf: Configuration,
-credentialManager: YARNHadoopDelegationTokenManager) extends Logging {
+hadoopConf: Configuration) extends Logging {
 
-  private var lastCredentialsFileSuffix = 0
+  private val principal = sparkConf.get(PRINCIPAL).get
+  private val keytab = sparkConf.get(KEYTAB).get
+  private val credentialManager = new 
YARNHadoopDelegationTokenManager(sparkConf, hadoopConf)
 
-  private val credentialRenewerThread: ScheduledExecutorService =
+  private val renewalExecutor: ScheduledExecutorService =
 ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh 
Thread")
 
-  private val hadoopUtil = SparkHadoopUtil.get
+  private val driverRef = new 

[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

2018-03-08 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20657#discussion_r173383032
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -520,4 +520,16 @@ package object config {
   .checkValue(v => v > 0, "The threshold should be positive.")
   .createWithDefault(1000)
 
+  private[spark] val CREDENTIALS_RENEWAL_INTERVAL_RATIO =
+ConfigBuilder("spark.security.credentials.renewalRatio")
+  .doc("Ratio of the credential's expiration time when Spark should 
fetch new credentials.")
+  .doubleConf
+  .createWithDefault(0.75d)
+
+  private[spark] val CREDENTIALS_RENEWAL_RETRY_WAIT =
+ConfigBuilder("spark.security.credentials.retryWait")
+  .doc("How long to wait before retrying to fetch new credentials 
after a failure.")
+  .timeConf(TimeUnit.SECONDS)
+  .createWithDefaultString("1h")
--- End diff --

Is this "1h" too big if the token expire time is small, for example 8 
hours, or even smaller, which will make the next retry directly fail.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

2018-03-08 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20657#discussion_r173380826
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
 ---
@@ -18,221 +18,160 @@ package org.apache.spark.deploy.yarn.security
 
 import java.security.PrivilegedExceptionAction
 import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.AtomicReference
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.security.HadoopDelegationTokenManager
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.ui.UIUtils
 import org.apache.spark.util.ThreadUtils
 
 /**
- * The following methods are primarily meant to make sure long-running 
apps like Spark
- * Streaming apps can run without interruption while accessing secured 
services. The
- * scheduleLoginFromKeytab method is called on the AM to get the new 
credentials.
- * This method wakes up a thread that logs into the KDC
- * once 75% of the renewal interval of the original credentials used for 
the container
- * has elapsed. It then obtains new credentials and writes them to HDFS in 
a
- * pre-specified location - the prefix of which is specified in the 
sparkConf by
- * spark.yarn.credentials.file (so the file(s) would be named 
c-timestamp1-1, c-timestamp2-2 etc.
- * - each update goes to a new file, with a monotonically increasing 
suffix), also the
- * timestamp1, timestamp2 here indicates the time of next update for 
CredentialUpdater.
- * After this, the credentials are renewed once 75% of the new tokens 
renewal interval has elapsed.
+ * A manager tasked with periodically updating delegation tokens needed by 
the application.
  *
- * On the executor and driver (yarn client mode) side, the 
updateCredentialsIfRequired method is
- * called once 80% of the validity of the original credentials has 
elapsed. At that time the
- * executor finds the credentials file with the latest timestamp and 
checks if it has read those
- * credentials before (by keeping track of the suffix of the last file it 
read). If a new file has
- * appeared, it will read the credentials and update the currently running 
UGI with it. This
- * process happens again once 80% of the validity of this has expired.
+ * This manager is meant to make sure long-running apps (such as Spark 
Streaming apps) can run
+ * without interruption while accessing secured services. It periodically 
logs in to the KDC with
+ * user-provided credentials, and contacts all the configured secure 
services to obtain delegation
+ * tokens to be distributed to the rest of the application.
+ *
+ * This class will manage the kerberos login, by renewing the TGT when 
needed. Because the UGI API
+ * does not expose the TTL of the TGT, a configuration controls how often 
to check that a relogin is
+ * necessary. This is done reasonably often since the check is a no-op 
when the relogin is not yet
+ * needed. The check period can be overridden in the configuration.
+ *
+ * New delegation tokens are created once 75% of the renewal interval of 
the original tokens has
+ * elapsed. The new tokens are sent to the Spark driver endpoint once it's 
registered with the AM.
+ * The driver is tasked with distributing the tokens to other processes 
that might need them.
  */
 private[yarn] class AMCredentialRenewer(
 sparkConf: SparkConf,
-hadoopConf: Configuration,
-credentialManager: YARNHadoopDelegationTokenManager) extends Logging {
+hadoopConf: Configuration) extends Logging {
 
-  private var lastCredentialsFileSuffix = 0
+  private val principal = sparkConf.get(PRINCIPAL).get
+  private val keytab = sparkConf.get(KEYTAB).get
+  private val credentialManager = new 
YARNHadoopDelegationTokenManager(sparkConf, hadoopConf)
 
-  private val credentialRenewerThread: ScheduledExecutorService =
+  private val renewalExecutor: ScheduledExecutorService =
 ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh 
Thread")
 
-  private val hadoopUtil = SparkHadoopUtil.get
+  private val driverRef = 

[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

2018-03-08 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20657#discussion_r173236591
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
 ---
@@ -18,221 +18,160 @@ package org.apache.spark.deploy.yarn.security
 
 import java.security.PrivilegedExceptionAction
 import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.AtomicReference
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.security.HadoopDelegationTokenManager
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.ui.UIUtils
 import org.apache.spark.util.ThreadUtils
 
 /**
- * The following methods are primarily meant to make sure long-running 
apps like Spark
- * Streaming apps can run without interruption while accessing secured 
services. The
- * scheduleLoginFromKeytab method is called on the AM to get the new 
credentials.
- * This method wakes up a thread that logs into the KDC
- * once 75% of the renewal interval of the original credentials used for 
the container
- * has elapsed. It then obtains new credentials and writes them to HDFS in 
a
- * pre-specified location - the prefix of which is specified in the 
sparkConf by
- * spark.yarn.credentials.file (so the file(s) would be named 
c-timestamp1-1, c-timestamp2-2 etc.
- * - each update goes to a new file, with a monotonically increasing 
suffix), also the
- * timestamp1, timestamp2 here indicates the time of next update for 
CredentialUpdater.
- * After this, the credentials are renewed once 75% of the new tokens 
renewal interval has elapsed.
+ * A manager tasked with periodically updating delegation tokens needed by 
the application.
  *
- * On the executor and driver (yarn client mode) side, the 
updateCredentialsIfRequired method is
- * called once 80% of the validity of the original credentials has 
elapsed. At that time the
- * executor finds the credentials file with the latest timestamp and 
checks if it has read those
- * credentials before (by keeping track of the suffix of the last file it 
read). If a new file has
- * appeared, it will read the credentials and update the currently running 
UGI with it. This
- * process happens again once 80% of the validity of this has expired.
+ * This manager is meant to make sure long-running apps (such as Spark 
Streaming apps) can run
+ * without interruption while accessing secured services. It periodically 
logs in to the KDC with
+ * user-provided credentials, and contacts all the configured secure 
services to obtain delegation
+ * tokens to be distributed to the rest of the application.
+ *
+ * This class will manage the kerberos login, by renewing the TGT when 
needed. Because the UGI API
+ * does not expose the TTL of the TGT, a configuration controls how often 
to check that a relogin is
+ * necessary. This is done reasonably often since the check is a no-op 
when the relogin is not yet
+ * needed. The check period can be overridden in the configuration.
+ *
+ * New delegation tokens are created once 75% of the renewal interval of 
the original tokens has
+ * elapsed. The new tokens are sent to the Spark driver endpoint once it's 
registered with the AM.
+ * The driver is tasked with distributing the tokens to other processes 
that might need them.
  */
 private[yarn] class AMCredentialRenewer(
 sparkConf: SparkConf,
-hadoopConf: Configuration,
-credentialManager: YARNHadoopDelegationTokenManager) extends Logging {
+hadoopConf: Configuration) extends Logging {
 
-  private var lastCredentialsFileSuffix = 0
+  private val principal = sparkConf.get(PRINCIPAL).get
+  private val keytab = sparkConf.get(KEYTAB).get
+  private val credentialManager = new 
YARNHadoopDelegationTokenManager(sparkConf, hadoopConf)
 
-  private val credentialRenewerThread: ScheduledExecutorService =
+  private val renewalExecutor: ScheduledExecutorService =
 ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh 
Thread")
 
-  private val hadoopUtil = SparkHadoopUtil.get
+  private val driverRef = new 

[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

2018-03-07 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20657#discussion_r173073703
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -520,4 +520,16 @@ package object config {
   .checkValue(v => v > 0, "The threshold should be positive.")
   .createWithDefault(1000)
 
+  private[spark] val CREDENTIALS_RENEWAL_INTERVAL_RATIO =
+ConfigBuilder("spark.security.credentials.renewalRatio")
+  .doc("Ratio of the credential's expiration time when Spark should 
fetch new credentials.")
+  .doubleConf
+  .createWithDefault(0.75d)
+
+  private[spark] val CREDENTIALS_RENEWAL_RETRY_WAIT =
+ConfigBuilder("spark.security.credentials.retryWait")
+  .doc("How long to wait before retrying to fetch new credentials 
after a failure.")
+  .timeConf(TimeUnit.SECONDS)
+  .createWithDefaultString("1h")
--- End diff --

Be better to make them undocumented, so that developers still could adjust 
them to test. But end users don't need to touch them.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

2018-03-07 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20657#discussion_r173078047
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
 ---
@@ -18,221 +18,160 @@ package org.apache.spark.deploy.yarn.security
 
 import java.security.PrivilegedExceptionAction
 import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.AtomicReference
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.security.HadoopDelegationTokenManager
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.ui.UIUtils
 import org.apache.spark.util.ThreadUtils
 
 /**
- * The following methods are primarily meant to make sure long-running 
apps like Spark
- * Streaming apps can run without interruption while accessing secured 
services. The
- * scheduleLoginFromKeytab method is called on the AM to get the new 
credentials.
- * This method wakes up a thread that logs into the KDC
- * once 75% of the renewal interval of the original credentials used for 
the container
- * has elapsed. It then obtains new credentials and writes them to HDFS in 
a
- * pre-specified location - the prefix of which is specified in the 
sparkConf by
- * spark.yarn.credentials.file (so the file(s) would be named 
c-timestamp1-1, c-timestamp2-2 etc.
- * - each update goes to a new file, with a monotonically increasing 
suffix), also the
- * timestamp1, timestamp2 here indicates the time of next update for 
CredentialUpdater.
- * After this, the credentials are renewed once 75% of the new tokens 
renewal interval has elapsed.
+ * A manager tasked with periodically updating delegation tokens needed by 
the application.
  *
- * On the executor and driver (yarn client mode) side, the 
updateCredentialsIfRequired method is
- * called once 80% of the validity of the original credentials has 
elapsed. At that time the
- * executor finds the credentials file with the latest timestamp and 
checks if it has read those
- * credentials before (by keeping track of the suffix of the last file it 
read). If a new file has
- * appeared, it will read the credentials and update the currently running 
UGI with it. This
- * process happens again once 80% of the validity of this has expired.
+ * This manager is meant to make sure long-running apps (such as Spark 
Streaming apps) can run
+ * without interruption while accessing secured services. It periodically 
logs in to the KDC with
+ * user-provided credentials, and contacts all the configured secure 
services to obtain delegation
+ * tokens to be distributed to the rest of the application.
+ *
+ * This class will manage the kerberos login, by renewing the TGT when 
needed. Because the UGI API
+ * does not expose the TTL of the TGT, a configuration controls how often 
to check that a relogin is
+ * necessary. This is done reasonably often since the check is a no-op 
when the relogin is not yet
+ * needed. The check period can be overridden in the configuration.
+ *
+ * New delegation tokens are created once 75% of the renewal interval of 
the original tokens has
+ * elapsed. The new tokens are sent to the Spark driver endpoint once it's 
registered with the AM.
+ * The driver is tasked with distributing the tokens to other processes 
that might need them.
  */
 private[yarn] class AMCredentialRenewer(
 sparkConf: SparkConf,
-hadoopConf: Configuration,
-credentialManager: YARNHadoopDelegationTokenManager) extends Logging {
+hadoopConf: Configuration) extends Logging {
 
-  private var lastCredentialsFileSuffix = 0
+  private val principal = sparkConf.get(PRINCIPAL).get
+  private val keytab = sparkConf.get(KEYTAB).get
+  private val credentialManager = new 
YARNHadoopDelegationTokenManager(sparkConf, hadoopConf)
 
-  private val credentialRenewerThread: ScheduledExecutorService =
+  private val renewalExecutor: ScheduledExecutorService =
 ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh 
Thread")
 
-  private val hadoopUtil = SparkHadoopUtil.get
+  private val driverRef = 

[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

2018-03-07 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20657#discussion_r172954706
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
---
@@ -93,11 +93,24 @@ private[spark] class Client(
 
   private val distCacheMgr = new ClientDistributedCacheManager()
 
-  private var loginFromKeytab = false
-  private var principal: String = null
-  private var keytab: String = null
-  private var credentials: Credentials = null
-  private var amKeytabFileName: String = null
+  private val principal = sparkConf.get(PRINCIPAL).orNull
+  private val keytab = sparkConf.get(KEYTAB).orNull
+  private val loginFromKeytab = principal != null
+  private val amKeytabFileName: String = {
+require((principal == null) == (keytab == null),
+  "Both principal and keytab must be defined, or neither.")
+if (loginFromKeytab) {
+  logInfo(s"Kerberos credentials: principal = $principal, keytab = 
$keytab")
+  // Generate a file name that can be used for the keytab file, that 
does not conflict
+  // with any user file.
+  new File(keytab).getName() + "-" + UUID.randomUUID().toString
+} else {
+  null
+}
+  }
+
+  // Defensive copy of the credentials
+  private val credentials = new 
Credentials(UserGroupInformation.getCurrentUser.getCredentials)
--- End diff --

this appears to be unused.  did you mean to use this in 
`setupSecurityToken()`?  not really sure what you're defending against with the 
copy, perhaps that should go in the comment as well ... I see it was just in 
the old code though.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

2018-03-07 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20657#discussion_r172963955
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
 ---
@@ -18,221 +18,156 @@ package org.apache.spark.deploy.yarn.security
 
 import java.security.PrivilegedExceptionAction
 import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.AtomicReference
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.security.HadoopDelegationTokenManager
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.ui.UIUtils
 import org.apache.spark.util.ThreadUtils
 
 /**
- * The following methods are primarily meant to make sure long-running 
apps like Spark
- * Streaming apps can run without interruption while accessing secured 
services. The
- * scheduleLoginFromKeytab method is called on the AM to get the new 
credentials.
- * This method wakes up a thread that logs into the KDC
- * once 75% of the renewal interval of the original credentials used for 
the container
- * has elapsed. It then obtains new credentials and writes them to HDFS in 
a
- * pre-specified location - the prefix of which is specified in the 
sparkConf by
- * spark.yarn.credentials.file (so the file(s) would be named 
c-timestamp1-1, c-timestamp2-2 etc.
- * - each update goes to a new file, with a monotonically increasing 
suffix), also the
- * timestamp1, timestamp2 here indicates the time of next update for 
CredentialUpdater.
- * After this, the credentials are renewed once 75% of the new tokens 
renewal interval has elapsed.
+ * A manager tasked with periodically updating delegation tokens needed by 
the application.
  *
- * On the executor and driver (yarn client mode) side, the 
updateCredentialsIfRequired method is
- * called once 80% of the validity of the original credentials has 
elapsed. At that time the
- * executor finds the credentials file with the latest timestamp and 
checks if it has read those
- * credentials before (by keeping track of the suffix of the last file it 
read). If a new file has
- * appeared, it will read the credentials and update the currently running 
UGI with it. This
- * process happens again once 80% of the validity of this has expired.
+ * This manager is meant to make sure long-running apps (such as Spark 
Streaming apps) can run
+ * without interruption while accessing secured services. It periodically 
logs in to the KDC with
+ * user-provided credentials, and contacts all the configured secure 
services to obtain delegation
+ * tokens to be distributed to the rest of the application.
--- End diff --

for folks like me less familiar with this, this seems like a good spot to 
explain the overall flow a little bit more.  Eg.

The KDC provides a ticket granting ticket (tgt), which is then used to 
obtain delegation tokens for each service.  The KDC does not expose the tgt's 
expiry time, so renewal is controlled by a conf (by default 1m, much more 
frequent than usual expiry times).  Each providers delegation token provider 
should determine the expiry time of the delegation token, so they can be 
renewed appropriately.

(in particular I needed an extra read to figure out why the tgt had its own 
renewal mechanism)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

2018-03-06 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20657#discussion_r172604366
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -520,4 +520,16 @@ package object config {
   .checkValue(v => v > 0, "The threshold should be positive.")
   .createWithDefault(1000)
 
+  private[spark] val CREDENTIALS_RENEWAL_INTERVAL_RATIO =
+ConfigBuilder("spark.security.credentials.renewalRatio")
+  .doc("Ratio of the credential's expiration time when Spark should 
fetch new credentials.")
+  .doubleConf
+  .createWithDefault(0.75d)
+
+  private[spark] val CREDENTIALS_RENEWAL_RETRY_WAIT =
+ConfigBuilder("spark.security.credentials.retryWait")
+  .doc("How long to wait before retrying to fetch new credentials 
after a failure.")
+  .timeConf(TimeUnit.SECONDS)
+  .createWithDefaultString("1h")
--- End diff --

To me internal is something that is for internal Spark use, e.g. the 
configs I'm removing which are set by Spark itself.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

2018-03-06 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20657#discussion_r172581601
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
---
@@ -144,7 +145,8 @@ class SparkHadoopUtil extends Logging {
   private[spark] def addDelegationTokens(tokens: Array[Byte], sparkConf: 
SparkConf) {
 UserGroupInformation.setConfiguration(newConfiguration(sparkConf))
 val creds = deserialize(tokens)
-logInfo(s"Adding/updating delegation tokens ${dumpTokens(creds)}")
+logInfo("Updating delegation tokens for current user.")
--- End diff --

yeah I was thinking it might be handy to have it logged in the executors 
and driver as well, sort of as an RPC id, so you could correlate the log lines, 
in case there was ever a delay in propagation or a failure to get to one 
executor or something, since you're choosing to always log something here.  
Still, your call.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

2018-03-06 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20657#discussion_r172579936
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -520,4 +520,16 @@ package object config {
   .checkValue(v => v > 0, "The threshold should be positive.")
   .createWithDefault(1000)
 
+  private[spark] val CREDENTIALS_RENEWAL_INTERVAL_RATIO =
+ConfigBuilder("spark.security.credentials.renewalRatio")
+  .doc("Ratio of the credential's expiration time when Spark should 
fetch new credentials.")
+  .doubleConf
+  .createWithDefault(0.75d)
+
+  private[spark] val CREDENTIALS_RENEWAL_RETRY_WAIT =
+ConfigBuilder("spark.security.credentials.retryWait")
+  .doc("How long to wait before retrying to fetch new credentials 
after a failure.")
+  .timeConf(TimeUnit.SECONDS)
+  .createWithDefaultString("1h")
--- End diff --

I thought that is what internal meant ... a user *could* specify them, but 
we don't document them at all, so not a stable part of the api etc.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

2018-03-05 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20657#discussion_r172362293
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
---
@@ -144,7 +145,8 @@ class SparkHadoopUtil extends Logging {
   private[spark] def addDelegationTokens(tokens: Array[Byte], sparkConf: 
SparkConf) {
 UserGroupInformation.setConfiguration(newConfiguration(sparkConf))
 val creds = deserialize(tokens)
-logInfo(s"Adding/updating delegation tokens ${dumpTokens(creds)}")
+logInfo("Updating delegation tokens for current user.")
--- End diff --

That information is already logged by the AM, that's enough for debugging.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

2018-03-05 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20657#discussion_r172362119
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
 ---
@@ -18,221 +18,156 @@ package org.apache.spark.deploy.yarn.security
 
 import java.security.PrivilegedExceptionAction
 import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.AtomicReference
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.security.HadoopDelegationTokenManager
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.ui.UIUtils
 import org.apache.spark.util.ThreadUtils
 
 /**
- * The following methods are primarily meant to make sure long-running 
apps like Spark
- * Streaming apps can run without interruption while accessing secured 
services. The
- * scheduleLoginFromKeytab method is called on the AM to get the new 
credentials.
- * This method wakes up a thread that logs into the KDC
- * once 75% of the renewal interval of the original credentials used for 
the container
- * has elapsed. It then obtains new credentials and writes them to HDFS in 
a
- * pre-specified location - the prefix of which is specified in the 
sparkConf by
- * spark.yarn.credentials.file (so the file(s) would be named 
c-timestamp1-1, c-timestamp2-2 etc.
- * - each update goes to a new file, with a monotonically increasing 
suffix), also the
- * timestamp1, timestamp2 here indicates the time of next update for 
CredentialUpdater.
- * After this, the credentials are renewed once 75% of the new tokens 
renewal interval has elapsed.
+ * A manager tasked with periodically updating delegation tokens needed by 
the application.
  *
- * On the executor and driver (yarn client mode) side, the 
updateCredentialsIfRequired method is
- * called once 80% of the validity of the original credentials has 
elapsed. At that time the
- * executor finds the credentials file with the latest timestamp and 
checks if it has read those
- * credentials before (by keeping track of the suffix of the last file it 
read). If a new file has
- * appeared, it will read the credentials and update the currently running 
UGI with it. This
- * process happens again once 80% of the validity of this has expired.
+ * This manager is meant to make sure long-running apps (such as Spark 
Streaming apps) can run
+ * without interruption while accessing secured services. It periodically 
logs in to the KDC with
+ * user-provided credentials, and contacts all the configured secure 
services to obtain delegation
+ * tokens to be distributed to the rest of the application.
+ *
+ * New delegation tokens are created once 75% of the renewal interval of 
the original tokens has
+ * elapsed. The new tokens are both added to the current user, and also 
sent to the Spark driver
+ * once it's registered with the AM. The driver is tasked with 
distributing the tokens to other
+ * processes that might need them.
  */
 private[yarn] class AMCredentialRenewer(
 sparkConf: SparkConf,
-hadoopConf: Configuration,
-credentialManager: YARNHadoopDelegationTokenManager) extends Logging {
+hadoopConf: Configuration) extends Logging {
 
-  private var lastCredentialsFileSuffix = 0
+  private val principal = sparkConf.get(PRINCIPAL).get
+  private val keytab = sparkConf.get(KEYTAB).get
+  private val credentialManager = new 
YARNHadoopDelegationTokenManager(sparkConf, hadoopConf)
 
-  private val credentialRenewerThread: ScheduledExecutorService =
+  private val renewalExecutor: ScheduledExecutorService =
 ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh 
Thread")
 
-  private val hadoopUtil = SparkHadoopUtil.get
+  private val driverRef = new AtomicReference[RpcEndpointRef]()
 
-  private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
-  private val daysToKeepFiles = 
sparkConf.get(CREDENTIALS_FILE_MAX_RETENTION)
-  private val numFilesToKeep = sparkConf.get(CREDENTIAL_FILE_MAX_COUNT)
-  private val freshHadoopConf =
-

[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

2018-03-05 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20657#discussion_r172361945
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -520,4 +520,16 @@ package object config {
   .checkValue(v => v > 0, "The threshold should be positive.")
   .createWithDefault(1000)
 
+  private[spark] val CREDENTIALS_RENEWAL_INTERVAL_RATIO =
+ConfigBuilder("spark.security.credentials.renewalRatio")
+  .doc("Ratio of the credential's expiration time when Spark should 
fetch new credentials.")
+  .doubleConf
+  .createWithDefault(0.75d)
+
+  private[spark] val CREDENTIALS_RENEWAL_RETRY_WAIT =
+ConfigBuilder("spark.security.credentials.retryWait")
+  .doc("How long to wait before retrying to fetch new credentials 
after a failure.")
+  .timeConf(TimeUnit.SECONDS)
+  .createWithDefaultString("1h")
--- End diff --

They're not really internal, just not documented.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

2018-03-05 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20657#discussion_r172319244
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
 ---
@@ -18,221 +18,156 @@ package org.apache.spark.deploy.yarn.security
 
 import java.security.PrivilegedExceptionAction
 import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.AtomicReference
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.security.HadoopDelegationTokenManager
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.ui.UIUtils
 import org.apache.spark.util.ThreadUtils
 
 /**
- * The following methods are primarily meant to make sure long-running 
apps like Spark
- * Streaming apps can run without interruption while accessing secured 
services. The
- * scheduleLoginFromKeytab method is called on the AM to get the new 
credentials.
- * This method wakes up a thread that logs into the KDC
- * once 75% of the renewal interval of the original credentials used for 
the container
- * has elapsed. It then obtains new credentials and writes them to HDFS in 
a
- * pre-specified location - the prefix of which is specified in the 
sparkConf by
- * spark.yarn.credentials.file (so the file(s) would be named 
c-timestamp1-1, c-timestamp2-2 etc.
- * - each update goes to a new file, with a monotonically increasing 
suffix), also the
- * timestamp1, timestamp2 here indicates the time of next update for 
CredentialUpdater.
- * After this, the credentials are renewed once 75% of the new tokens 
renewal interval has elapsed.
+ * A manager tasked with periodically updating delegation tokens needed by 
the application.
  *
- * On the executor and driver (yarn client mode) side, the 
updateCredentialsIfRequired method is
- * called once 80% of the validity of the original credentials has 
elapsed. At that time the
- * executor finds the credentials file with the latest timestamp and 
checks if it has read those
- * credentials before (by keeping track of the suffix of the last file it 
read). If a new file has
- * appeared, it will read the credentials and update the currently running 
UGI with it. This
- * process happens again once 80% of the validity of this has expired.
+ * This manager is meant to make sure long-running apps (such as Spark 
Streaming apps) can run
+ * without interruption while accessing secured services. It periodically 
logs in to the KDC with
+ * user-provided credentials, and contacts all the configured secure 
services to obtain delegation
+ * tokens to be distributed to the rest of the application.
+ *
+ * New delegation tokens are created once 75% of the renewal interval of 
the original tokens has
+ * elapsed. The new tokens are both added to the current user, and also 
sent to the Spark driver
+ * once it's registered with the AM. The driver is tasked with 
distributing the tokens to other
+ * processes that might need them.
  */
 private[yarn] class AMCredentialRenewer(
 sparkConf: SparkConf,
-hadoopConf: Configuration,
-credentialManager: YARNHadoopDelegationTokenManager) extends Logging {
+hadoopConf: Configuration) extends Logging {
 
-  private var lastCredentialsFileSuffix = 0
+  private val principal = sparkConf.get(PRINCIPAL).get
+  private val keytab = sparkConf.get(KEYTAB).get
+  private val credentialManager = new 
YARNHadoopDelegationTokenManager(sparkConf, hadoopConf)
 
-  private val credentialRenewerThread: ScheduledExecutorService =
+  private val renewalExecutor: ScheduledExecutorService =
 ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh 
Thread")
 
-  private val hadoopUtil = SparkHadoopUtil.get
+  private val driverRef = new AtomicReference[RpcEndpointRef]()
 
-  private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
-  private val daysToKeepFiles = 
sparkConf.get(CREDENTIALS_FILE_MAX_RETENTION)
-  private val numFilesToKeep = sparkConf.get(CREDENTIAL_FILE_MAX_COUNT)
-  private val freshHadoopConf =
-

[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

2018-03-05 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20657#discussion_r172322650
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -520,4 +520,16 @@ package object config {
   .checkValue(v => v > 0, "The threshold should be positive.")
   .createWithDefault(1000)
 
+  private[spark] val CREDENTIALS_RENEWAL_INTERVAL_RATIO =
+ConfigBuilder("spark.security.credentials.renewalRatio")
+  .doc("Ratio of the credential's expiration time when Spark should 
fetch new credentials.")
+  .doubleConf
+  .createWithDefault(0.75d)
+
+  private[spark] val CREDENTIALS_RENEWAL_RETRY_WAIT =
+ConfigBuilder("spark.security.credentials.retryWait")
+  .doc("How long to wait before retrying to fetch new credentials 
after a failure.")
+  .timeConf(TimeUnit.SECONDS)
+  .createWithDefaultString("1h")
--- End diff --

`.internal()` for both


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

2018-03-05 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20657#discussion_r172323592
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
 ---
@@ -18,221 +18,156 @@ package org.apache.spark.deploy.yarn.security
 
 import java.security.PrivilegedExceptionAction
 import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.AtomicReference
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.security.HadoopDelegationTokenManager
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.ui.UIUtils
 import org.apache.spark.util.ThreadUtils
 
 /**
- * The following methods are primarily meant to make sure long-running 
apps like Spark
- * Streaming apps can run without interruption while accessing secured 
services. The
- * scheduleLoginFromKeytab method is called on the AM to get the new 
credentials.
- * This method wakes up a thread that logs into the KDC
- * once 75% of the renewal interval of the original credentials used for 
the container
- * has elapsed. It then obtains new credentials and writes them to HDFS in 
a
- * pre-specified location - the prefix of which is specified in the 
sparkConf by
- * spark.yarn.credentials.file (so the file(s) would be named 
c-timestamp1-1, c-timestamp2-2 etc.
- * - each update goes to a new file, with a monotonically increasing 
suffix), also the
- * timestamp1, timestamp2 here indicates the time of next update for 
CredentialUpdater.
- * After this, the credentials are renewed once 75% of the new tokens 
renewal interval has elapsed.
+ * A manager tasked with periodically updating delegation tokens needed by 
the application.
  *
- * On the executor and driver (yarn client mode) side, the 
updateCredentialsIfRequired method is
- * called once 80% of the validity of the original credentials has 
elapsed. At that time the
- * executor finds the credentials file with the latest timestamp and 
checks if it has read those
- * credentials before (by keeping track of the suffix of the last file it 
read). If a new file has
- * appeared, it will read the credentials and update the currently running 
UGI with it. This
- * process happens again once 80% of the validity of this has expired.
+ * This manager is meant to make sure long-running apps (such as Spark 
Streaming apps) can run
+ * without interruption while accessing secured services. It periodically 
logs in to the KDC with
+ * user-provided credentials, and contacts all the configured secure 
services to obtain delegation
+ * tokens to be distributed to the rest of the application.
+ *
+ * New delegation tokens are created once 75% of the renewal interval of 
the original tokens has
+ * elapsed. The new tokens are both added to the current user, and also 
sent to the Spark driver
+ * once it's registered with the AM. The driver is tasked with 
distributing the tokens to other
+ * processes that might need them.
  */
 private[yarn] class AMCredentialRenewer(
 sparkConf: SparkConf,
-hadoopConf: Configuration,
-credentialManager: YARNHadoopDelegationTokenManager) extends Logging {
+hadoopConf: Configuration) extends Logging {
 
-  private var lastCredentialsFileSuffix = 0
+  private val principal = sparkConf.get(PRINCIPAL).get
+  private val keytab = sparkConf.get(KEYTAB).get
+  private val credentialManager = new 
YARNHadoopDelegationTokenManager(sparkConf, hadoopConf)
 
-  private val credentialRenewerThread: ScheduledExecutorService =
+  private val renewalExecutor: ScheduledExecutorService =
 ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh 
Thread")
 
-  private val hadoopUtil = SparkHadoopUtil.get
+  private val driverRef = new AtomicReference[RpcEndpointRef]()
 
-  private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
-  private val daysToKeepFiles = 
sparkConf.get(CREDENTIALS_FILE_MAX_RETENTION)
-  private val numFilesToKeep = sparkConf.get(CREDENTIAL_FILE_MAX_COUNT)
-  private val freshHadoopConf =
-

[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

2018-03-05 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20657#discussion_r172325576
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
---
@@ -144,7 +145,8 @@ class SparkHadoopUtil extends Logging {
   private[spark] def addDelegationTokens(tokens: Array[Byte], sparkConf: 
SparkConf) {
 UserGroupInformation.setConfiguration(newConfiguration(sparkConf))
 val creds = deserialize(tokens)
-logInfo(s"Adding/updating delegation tokens ${dumpTokens(creds)}")
+logInfo("Updating delegation tokens for current user.")
--- End diff --

just a thought -- rather than just serializing the Credentials, would it be 
helpful to serialize a timestamp when the tokens were obtained and when they 
will be refreshed as well, so it could be logged here?
you have spent more time debugging cases with problems so you will probably 
have a better idea if that would be helpful


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

2018-03-05 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20657#discussion_r172321966
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
---
@@ -1009,7 +987,7 @@ private[spark] class Client(
   }
 
   def setupCredentials(): Unit = {
-loginFromKeytab = sparkConf.contains(PRINCIPAL.key)
+loginFromKeytab = sparkConf.contains(PRINCIPAL)
--- End diff --

if a user only specifies keytab, but no principal, I don't think this will 
fail in a friendly way.  This will be a no-op, so it'll succeed, and then in 
ApplicationMaster / AMCredentialRenewer, you'll get an error trying to do 
`sparkConf.get(PRINCIPAL).get`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

2018-02-21 Thread vanzin
GitHub user vanzin opened a pull request:

https://github.com/apache/spark/pull/20657

[SPARK-23361][yarn] Allow AM to restart after initial tokens expire.

Currently, the Spark AM relies on the initial set of tokens created by
the submission client to be able to talk to HDFS and other services that
require delegation tokens. This means that after those tokens expire, a
new AM will fail to start (e.g. when there is an application failure and
re-attempts are enabled).

This PR makes it so that the first thing the AM does when the user provides
a principal and keytab is to create new delegation tokens for use. This
makes sure that the AM can be started irrespective of how old the original
token set is. It also allows all of the token management to be done by the
AM - there is no need for the submission client to set configuration values
to tell the AM when to renew tokens.

Note that even though in this case the AM will not be using the delegation
tokens created by the submission client, those tokens still need to be 
provided
to YARN, since they are used to do log aggregation.

To be able to re-use the code in the AMCredentialRenewal for the above
purposes, I refactored that class a bit so that it can fetch tokens into
a pre-defined UGI, insted of always logging in.

Another issue with re-attempts is that, after the fix that allows the AM
to restart correctly, new executors would get confused about when to
update credentials, because the credential updater used the update time
initially set up by the submission code. This could make the executor
fail to update credentials in time, since that value would be very out
of date in the situation described in the bug.

To fix that, I changed the YARN code to use the new RPC-based mechanism
for distributing tokens to executors. This allowed the old credential
updater code to be removed, and a lot of code in the renewer to be
simplified.

I also made two currently hardcoded values (the renewal time ratio, and
the retry wait) configurable; while this probably never needs to be set
by anyone in a production environment, it helps with testing; that's also
why they're not documented.

Tested on real cluster with a specially crafted application to test this
functionality: checked proper access to HDFS, Hive and HBase in cluster
mode with token renewal on and AM restarts. Tested things still work in
client mode too.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vanzin/spark SPARK-23361

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20657.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20657


commit 2c3448dd3aa4071234a65a1c9317b1a3c4fe8d24
Author: Marcelo Vanzin 
Date:   2018-02-09T22:33:05Z

[SPARK-23361][yarn] Allow AM to restart after initial tokens expire.

Currently, the Spark AM relies on the initial set of tokens created by
the submission client to be able to talk to HDFS and other services that
require delegation tokens. This means that after those tokens expire, a
new AM will fail to start (e.g. when there is an application failure and
re-attempts are enabled).

This PR makes it so that the first thing the AM does when the user provides
a principal and keytab is to create new delegation tokens for use. This
makes sure that the AM can be started irrespective of how old the original
token set is. It also allows all of the token management to be done by the
AM - there is no need for the submission client to set configuration values
to tell the AM when to renew tokens.

Note that even though in this case the AM will not be using the delegation
tokens created by the submission client, those tokens still need to be 
provided
to YARN, since they are used to do log aggregation.

To be able to re-use the code in the AMCredentialRenewal for the above
purposes, I refactored that class a bit so that it can fetch tokens into
a pre-defined UGI, insted of always logging in.

Another issue with re-attempts is that, after the fix that allows the AM
to restart correctly, new executors would get confused about when to
update credentials, because the credential updater used the update time
initially set up by the submission code. This could make the executor
fail to update credentials in time, since that value would be very out
of date in the situation described in the bug.

To fix that, I changed the YARN code to use the new RPC-based mechanism
for distributing tokens to executors. This allowed the old credential
updater code to be removed,