Repository: spark
Updated Branches:
  refs/heads/master b2edc30db -> 5fa438471


[SPARK-23361][YARN] Allow AM to restart after initial tokens expire.

Currently, the Spark AM relies on the initial set of tokens created by
the submission client to be able to talk to HDFS and other services that
require delegation tokens. This means that after those tokens expire, a
new AM will fail to start (e.g. when there is an application failure and
re-attempts are enabled).

This PR makes it so that the first thing the AM does when the user provides
a principal and keytab is to create new delegation tokens for use. This
makes sure that the AM can be started irrespective of how old the original
token set is. It also allows all of the token management to be done by the
AM - there is no need for the submission client to set configuration values
to tell the AM when to renew tokens.

Note that even though in this case the AM will not be using the delegation
tokens created by the submission client, those tokens still need to be provided
to YARN, since they are used to do log aggregation.

To be able to re-use the code in the AMCredentialRenewal for the above
purposes, I refactored that class a bit so that it can fetch tokens into
a pre-defined UGI, insted of always logging in.

Another issue with re-attempts is that, after the fix that allows the AM
to restart correctly, new executors would get confused about when to
update credentials, because the credential updater used the update time
initially set up by the submission code. This could make the executor
fail to update credentials in time, since that value would be very out
of date in the situation described in the bug.

To fix that, I changed the YARN code to use the new RPC-based mechanism
for distributing tokens to executors. This allowed the old credential
updater code to be removed, and a lot of code in the renewer to be
simplified.

I also made two currently hardcoded values (the renewal time ratio, and
the retry wait) configurable; while this probably never needs to be set
by anyone in a production environment, it helps with testing; that's also
why they're not documented.

Tested on real cluster with a specially crafted application to test this
functionality: checked proper access to HDFS, Hive and HBase in cluster
mode with token renewal on and AM restarts. Tested things still work in
client mode too.

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #20657 from vanzin/SPARK-23361.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5fa43847
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5fa43847
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5fa43847

Branch: refs/heads/master
Commit: 5fa438471110afbf4e2174df449ac79e292501f8
Parents: b2edc30
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Fri Mar 23 13:59:21 2018 +0800
Committer: jerryshao <ss...@hortonworks.com>
Committed: Fri Mar 23 13:59:21 2018 +0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkConf.scala |  12 +-
 .../apache/spark/deploy/SparkHadoopUtil.scala   |  32 +-
 .../executor/CoarseGrainedExecutorBackend.scala |  12 -
 .../apache/spark/internal/config/package.scala  |  12 +
 .../MesosHadoopDelegationTokenManager.scala     |  11 +-
 .../spark/deploy/yarn/ApplicationMaster.scala   | 117 +++-----
 .../org/apache/spark/deploy/yarn/Client.scala   | 102 +++----
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala |  20 --
 .../org/apache/spark/deploy/yarn/config.scala   |  25 --
 .../yarn/security/AMCredentialRenewer.scala     | 291 ++++++++-----------
 .../yarn/security/CredentialUpdater.scala       | 131 ---------
 .../YARNHadoopDelegationTokenManager.scala      |   9 +-
 .../cluster/YarnClientSchedulerBackend.scala    |   9 +-
 .../cluster/YarnSchedulerBackend.scala          |  10 +-
 .../YARNHadoopDelegationTokenManagerSuite.scala |   7 +-
 .../org/apache/spark/streaming/Checkpoint.scala |   3 -
 16 files changed, 238 insertions(+), 565 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5fa43847/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala 
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index f53b2be..129956e 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -603,13 +603,15 @@ private[spark] object SparkConf extends Logging {
         "Please use spark.kryoserializer.buffer instead. The default value for 
" +
           "spark.kryoserializer.buffer.mb was previously specified as '0.064'. 
Fractional values " +
           "are no longer accepted. To specify the equivalent now, one may use 
'64k'."),
-      DeprecatedConfig("spark.rpc", "2.0", "Not used any more."),
+      DeprecatedConfig("spark.rpc", "2.0", "Not used anymore."),
       DeprecatedConfig("spark.scheduler.executorTaskBlacklistTime", "2.1.0",
         "Please use the new blacklisting options, spark.blacklist.*"),
-      DeprecatedConfig("spark.yarn.am.port", "2.0.0", "Not used any more"),
-      DeprecatedConfig("spark.executor.port", "2.0.0", "Not used any more"),
+      DeprecatedConfig("spark.yarn.am.port", "2.0.0", "Not used anymore"),
+      DeprecatedConfig("spark.executor.port", "2.0.0", "Not used anymore"),
       DeprecatedConfig("spark.shuffle.service.index.cache.entries", "2.3.0",
-        "Not used any more. Please use spark.shuffle.service.index.cache.size")
+        "Not used anymore. Please use spark.shuffle.service.index.cache.size"),
+      DeprecatedConfig("spark.yarn.credentials.file.retention.count", "2.4.0", 
"Not used anymore."),
+      DeprecatedConfig("spark.yarn.credentials.file.retention.days", "2.4.0", 
"Not used anymore.")
     )
 
     Map(configs.map { cfg => (cfg.key -> cfg) } : _*)
@@ -748,7 +750,7 @@ private[spark] object SparkConf extends Logging {
     }
     if (key.startsWith("spark.akka") || key.startsWith("spark.ssl.akka")) {
       logWarning(
-        s"The configuration key $key is not supported any more " +
+        s"The configuration key $key is not supported anymore " +
           s"because Spark doesn't use Akka since 2.0")
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/5fa43847/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 177295f..8353e64 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -40,6 +40,7 @@ import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdenti
 import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
 import org.apache.spark.util.Utils
 
 /**
@@ -146,7 +147,8 @@ class SparkHadoopUtil extends Logging {
   private[spark] def addDelegationTokens(tokens: Array[Byte], sparkConf: 
SparkConf) {
     UserGroupInformation.setConfiguration(newConfiguration(sparkConf))
     val creds = deserialize(tokens)
-    logInfo(s"Adding/updating delegation tokens ${dumpTokens(creds)}")
+    logInfo("Updating delegation tokens for current user.")
+    logDebug(s"Adding/updating delegation tokens ${dumpTokens(creds)}")
     addCurrentUserCredentials(creds)
   }
 
@@ -322,19 +324,6 @@ class SparkHadoopUtil extends Logging {
   }
 
   /**
-   * Return a fresh Hadoop configuration, bypassing the HDFS cache mechanism.
-   * This is to prevent the DFSClient from using an old cached token to 
connect to the NameNode.
-   */
-  private[spark] def getConfBypassingFSCache(
-      hadoopConf: Configuration,
-      scheme: String): Configuration = {
-    val newConf = new Configuration(hadoopConf)
-    val confKey = s"fs.${scheme}.impl.disable.cache"
-    newConf.setBoolean(confKey, true)
-    newConf
-  }
-
-  /**
    * Dump the credentials' tokens to string values.
    *
    * @param credentials credentials
@@ -447,16 +436,17 @@ object SparkHadoopUtil {
   def get: SparkHadoopUtil = instance
 
   /**
-   * Given an expiration date (e.g. for Hadoop Delegation Tokens) return a the 
date
-   * when a given fraction of the duration until the expiration date has 
passed.
-   * Formula: current time + (fraction * (time until expiration))
+   * Given an expiration date for the current set of credentials, calculate 
the time when new
+   * credentials should be created.
+   *
    * @param expirationDate Drop-dead expiration date
-   * @param fraction fraction of the time until expiration return
-   * @return Date when the fraction of the time until expiration has passed
+   * @param conf Spark configuration
+   * @return Timestamp when new credentials should be created.
    */
-  private[spark] def getDateOfNextUpdate(expirationDate: Long, fraction: 
Double): Long = {
+  private[spark] def nextCredentialRenewalTime(expirationDate: Long, conf: 
SparkConf): Long = {
     val ct = System.currentTimeMillis
-    (ct + (fraction * (expirationDate - ct))).toLong
+    val ratio = conf.get(CREDENTIALS_RENEWAL_INTERVAL_RATIO)
+    (ct + (ratio * (expirationDate - ct))).toLong
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/5fa43847/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 9b62e4b..48d3630 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -213,13 +213,6 @@ private[spark] object CoarseGrainedExecutorBackend extends 
Logging {
           driverConf.set(key, value)
         }
       }
-      if (driverConf.contains("spark.yarn.credentials.file")) {
-        logInfo("Will periodically update credentials from: " +
-          driverConf.get("spark.yarn.credentials.file"))
-        Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil")
-          .getMethod("startCredentialUpdater", classOf[SparkConf])
-          .invoke(null, driverConf)
-      }
 
       cfg.hadoopDelegationCreds.foreach { tokens =>
         SparkHadoopUtil.get.addDelegationTokens(tokens, driverConf)
@@ -234,11 +227,6 @@ private[spark] object CoarseGrainedExecutorBackend extends 
Logging {
         env.rpcEnv.setupEndpoint("WorkerWatcher", new 
WorkerWatcher(env.rpcEnv, url))
       }
       env.rpcEnv.awaitTermination()
-      if (driverConf.contains("spark.yarn.credentials.file")) {
-        Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil")
-          .getMethod("stopCredentialUpdater")
-          .invoke(null)
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5fa43847/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index a313ad0..407545a 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -525,4 +525,16 @@ package object config {
     .bytesConf(ByteUnit.BYTE)
     .createWithDefaultString("1g")
 
+  private[spark] val CREDENTIALS_RENEWAL_INTERVAL_RATIO =
+    ConfigBuilder("spark.security.credentials.renewalRatio")
+      .doc("Ratio of the credential's expiration time when Spark should fetch 
new credentials.")
+      .doubleConf
+      .createWithDefault(0.75d)
+
+  private[spark] val CREDENTIALS_RENEWAL_RETRY_WAIT =
+    ConfigBuilder("spark.security.credentials.retryWait")
+      .doc("How long to wait before retrying to fetch new credentials after a 
failure.")
+      .timeConf(TimeUnit.SECONDS)
+      .createWithDefaultString("1h")
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5fa43847/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
index 7165bfa..a1bf4f0 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
@@ -29,6 +29,7 @@ import 
org.apache.spark.deploy.security.HadoopDelegationTokenManager
 import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.rpc.RpcEndpointRef
 import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.ui.UIUtils
 import org.apache.spark.util.ThreadUtils
 
 
@@ -63,7 +64,7 @@ private[spark] class MesosHadoopDelegationTokenManager(
       val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
       val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
       logInfo(s"Initialized tokens: ${SparkHadoopUtil.get.dumpTokens(creds)}")
-      (SparkHadoopUtil.get.serialize(creds), 
SparkHadoopUtil.getDateOfNextUpdate(rt, 0.75))
+      (SparkHadoopUtil.get.serialize(creds), 
SparkHadoopUtil.nextCredentialRenewalTime(rt, conf))
     } catch {
       case e: Exception =>
         logError(s"Failed to fetch Hadoop delegation tokens $e")
@@ -104,8 +105,10 @@ private[spark] class MesosHadoopDelegationTokenManager(
           } catch {
             case e: Exception =>
               // Log the error and try to write new tokens back in an hour
-              logWarning("Couldn't broadcast tokens, trying again in an hour", 
e)
-              credentialRenewerThread.schedule(this, 1, TimeUnit.HOURS)
+              val delay = 
TimeUnit.SECONDS.toMillis(conf.get(config.CREDENTIALS_RENEWAL_RETRY_WAIT))
+              logWarning(
+                s"Couldn't broadcast tokens, trying again in 
${UIUtils.formatDuration(delay)}", e)
+              credentialRenewerThread.schedule(this, delay, 
TimeUnit.MILLISECONDS)
               return
           }
           scheduleRenewal(this)
@@ -135,7 +138,7 @@ private[spark] class MesosHadoopDelegationTokenManager(
         "related configurations in the target services.")
       currTime
     } else {
-      SparkHadoopUtil.getDateOfNextUpdate(nextRenewalTime, 0.75)
+      SparkHadoopUtil.nextCredentialRenewalTime(nextRenewalTime, conf)
     }
     logInfo(s"Time of next renewal is in ${timeOfNextRenewal - 
System.currentTimeMillis()} ms")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5fa43847/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 6e35d23..d04989e 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -29,7 +29,6 @@ import scala.concurrent.duration.Duration
 import scala.util.control.NonFatal
 
 import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.util.StringUtils
 import org.apache.hadoop.yarn.api._
 import org.apache.hadoop.yarn.api.records._
@@ -41,7 +40,7 @@ import org.apache.spark._
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.history.HistoryServer
 import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.deploy.yarn.security.{AMCredentialRenewer, 
YARNHadoopDelegationTokenManager}
+import org.apache.spark.deploy.yarn.security.AMCredentialRenewer
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.rpc._
@@ -79,42 +78,43 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
 
   private val yarnConf = new 
YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))
 
-  private val ugi = {
-    val original = UserGroupInformation.getCurrentUser()
-
-    // If a principal and keytab were provided, log in to kerberos, and set up 
a thread to
-    // renew the kerberos ticket when needed. Because the UGI API does not 
expose the TTL
-    // of the TGT, use a configuration to define how often to check that a 
relogin is necessary.
-    // checkTGTAndReloginFromKeytab() is a no-op if the relogin is not yet 
needed.
-    val principal = sparkConf.get(PRINCIPAL).orNull
-    val keytab = sparkConf.get(KEYTAB).orNull
-    if (principal != null && keytab != null) {
-      UserGroupInformation.loginUserFromKeytab(principal, keytab)
-
-      val renewer = new Thread() {
-        override def run(): Unit = Utils.tryLogNonFatalError {
-          while (true) {
-            TimeUnit.SECONDS.sleep(sparkConf.get(KERBEROS_RELOGIN_PERIOD))
-            
UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab()
-          }
-        }
+  private val userClassLoader = {
+    val classpath = Client.getUserClasspath(sparkConf)
+    val urls = classpath.map { entry =>
+      new URL("file:" + new File(entry.getPath()).getAbsolutePath())
+    }
+
+    if (isClusterMode) {
+      if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) {
+        new ChildFirstURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
+      } else {
+        new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
       }
-      renewer.setName("am-kerberos-renewer")
-      renewer.setDaemon(true)
-      renewer.start()
-
-      // Transfer the original user's tokens to the new user, since that's 
needed to connect to
-      // YARN. It also copies over any delegation tokens that might have been 
created by the
-      // client, which will then be transferred over when starting executors 
(until new ones
-      // are created by the periodic task).
-      val newUser = UserGroupInformation.getCurrentUser()
-      SparkHadoopUtil.get.transferCredentials(original, newUser)
-      newUser
     } else {
-      SparkHadoopUtil.get.createSparkUser()
+      new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
     }
   }
 
+  private val credentialRenewer: Option[AMCredentialRenewer] = 
sparkConf.get(KEYTAB).map { _ =>
+    new AMCredentialRenewer(sparkConf, yarnConf)
+  }
+
+  private val ugi = credentialRenewer match {
+    case Some(cr) =>
+      // Set the context class loader so that the token renewer has access to 
jars distributed
+      // by the user.
+      val currentLoader = Thread.currentThread().getContextClassLoader()
+      Thread.currentThread().setContextClassLoader(userClassLoader)
+      try {
+        cr.start()
+      } finally {
+        Thread.currentThread().setContextClassLoader(currentLoader)
+      }
+
+    case _ =>
+      SparkHadoopUtil.get.createSparkUser()
+  }
+
   private val client = doAsUser { new YarnRMClient() }
 
   // Default to twice the number of executors (twice the maximum number of 
executors if dynamic
@@ -148,23 +148,6 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
   // A flag to check whether user has initialized spark context
   @volatile private var registered = false
 
-  private val userClassLoader = {
-    val classpath = Client.getUserClasspath(sparkConf)
-    val urls = classpath.map { entry =>
-      new URL("file:" + new File(entry.getPath()).getAbsolutePath())
-    }
-
-    if (isClusterMode) {
-      if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) {
-        new ChildFirstURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
-      } else {
-        new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
-      }
-    } else {
-      new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
-    }
-  }
-
   // Lock for controlling the allocator (heartbeat) thread.
   private val allocatorLock = new Object()
 
@@ -189,8 +172,6 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
   // In cluster mode, used to tell the AM when the user's SparkContext has 
been initialized.
   private val sparkContextPromise = Promise[SparkContext]()
 
-  private var credentialRenewer: AMCredentialRenewer = _
-
   // Load the list of localized files set by the client. This is used when 
launching executors,
   // and is loaded here so that these configs don't pollute the Web UI's 
environment page in
   // cluster mode.
@@ -316,31 +297,6 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
         }
       }
 
-      // If the credentials file config is present, we must periodically renew 
tokens. So create
-      // a new AMDelegationTokenRenewer
-      if (sparkConf.contains(CREDENTIALS_FILE_PATH)) {
-        // Start a short-lived thread for AMCredentialRenewer, the only 
purpose is to set the
-        // classloader so that main jar and secondary jars could be used by 
AMCredentialRenewer.
-        val credentialRenewerThread = new Thread {
-          setName("AMCredentialRenewerStarter")
-          setContextClassLoader(userClassLoader)
-
-          override def run(): Unit = {
-            val credentialManager = new YARNHadoopDelegationTokenManager(
-              sparkConf,
-              yarnConf,
-              conf => YarnSparkHadoopUtil.hadoopFSsToAccess(sparkConf, conf))
-
-            val credentialRenewer =
-              new AMCredentialRenewer(sparkConf, yarnConf, credentialManager)
-            credentialRenewer.scheduleLoginFromKeytab()
-          }
-        }
-
-        credentialRenewerThread.start()
-        credentialRenewerThread.join()
-      }
-
       if (isClusterMode) {
         runDriver()
       } else {
@@ -409,9 +365,8 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
           logDebug("shutting down user thread")
           userClassThread.interrupt()
         }
-        if (!inShutdown && credentialRenewer != null) {
-          credentialRenewer.stop()
-          credentialRenewer = null
+        if (!inShutdown) {
+          credentialRenewer.foreach(_.stop())
         }
       }
     }
@@ -468,6 +423,8 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
       securityMgr,
       localResources)
 
+    credentialRenewer.foreach(_.setDriverRef(driverRef))
+
     // Initialize the AM endpoint *after* the allocator has been initialized. 
This ensures
     // that when the driver sends an initial executor request (e.g. after an 
AM restart),
     // the allocator is ready to service requests.

http://git-wip-us.apache.org/repos/asf/spark/blob/5fa43847/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 28087de..5763c3d 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -93,11 +93,21 @@ private[spark] class Client(
 
   private val distCacheMgr = new ClientDistributedCacheManager()
 
-  private var loginFromKeytab = false
-  private var principal: String = null
-  private var keytab: String = null
-  private var credentials: Credentials = null
-  private var amKeytabFileName: String = null
+  private val principal = sparkConf.get(PRINCIPAL).orNull
+  private val keytab = sparkConf.get(KEYTAB).orNull
+  private val loginFromKeytab = principal != null
+  private val amKeytabFileName: String = {
+    require((principal == null) == (keytab == null),
+      "Both principal and keytab must be defined, or neither.")
+    if (loginFromKeytab) {
+      logInfo(s"Kerberos credentials: principal = $principal, keytab = 
$keytab")
+      // Generate a file name that can be used for the keytab file, that does 
not conflict
+      // with any user file.
+      new File(keytab).getName() + "-" + UUID.randomUUID().toString
+    } else {
+      null
+    }
+  }
 
   private val launcherBackend = new LauncherBackend() {
     override protected def conf: SparkConf = sparkConf
@@ -120,11 +130,6 @@ private[spark] class Client(
   private val appStagingBaseDir = sparkConf.get(STAGING_DIR).map { new Path(_) 
}
     .getOrElse(FileSystem.get(hadoopConf).getHomeDirectory())
 
-  private val credentialManager = new YARNHadoopDelegationTokenManager(
-    sparkConf,
-    hadoopConf,
-    conf => YarnSparkHadoopUtil.hadoopFSsToAccess(sparkConf, conf))
-
   def reportLauncherState(state: SparkAppHandle.State): Unit = {
     launcherBackend.setState(state)
   }
@@ -145,9 +150,6 @@ private[spark] class Client(
     var appId: ApplicationId = null
     try {
       launcherBackend.connect()
-      // Setup the credentials before doing anything else,
-      // so we have don't have issues at any point.
-      setupCredentials()
       yarnClient.init(hadoopConf)
       yarnClient.start()
 
@@ -288,8 +290,26 @@ private[spark] class Client(
     appContext
   }
 
-  /** Set up security tokens for launching our ApplicationMaster container. */
+  /**
+   * Set up security tokens for launching our ApplicationMaster container.
+   *
+   * This method will obtain delegation tokens from all the registered 
providers, and set them in
+   * the AM's launch context.
+   */
   private def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = {
+    val credentials = UserGroupInformation.getCurrentUser().getCredentials()
+    val credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, 
hadoopConf)
+    credentialManager.obtainDelegationTokens(hadoopConf, credentials)
+
+    // When using a proxy user, copy the delegation tokens to the user's 
credentials. Avoid
+    // that for regular users, since in those case the user already has access 
to the TGT,
+    // and adding delegation tokens could lead to expired or cancelled tokens 
being used
+    // later, as reported in SPARK-15754.
+    val currentUser = UserGroupInformation.getCurrentUser()
+    if (SparkHadoopUtil.get.isProxyUser(currentUser)) {
+      currentUser.addCredentials(credentials)
+    }
+
     val dob = new DataOutputBuffer
     credentials.writeTokenStorageToStream(dob)
     amContainer.setTokens(ByteBuffer.wrap(dob.getData))
@@ -384,36 +404,6 @@ private[spark] class Client(
     // and add them as local resources to the application master.
     val fs = destDir.getFileSystem(hadoopConf)
 
-    // Merge credentials obtained from registered providers
-    val nearestTimeOfNextRenewal = 
credentialManager.obtainDelegationTokens(hadoopConf, credentials)
-
-    if (credentials != null) {
-      // Add credentials to current user's UGI, so that following operations 
don't need to use the
-      // Kerberos tgt to get delegations again in the client side.
-      val currentUser = UserGroupInformation.getCurrentUser()
-      if (SparkHadoopUtil.get.isProxyUser(currentUser)) {
-        currentUser.addCredentials(credentials)
-      }
-      logDebug(SparkHadoopUtil.get.dumpTokens(credentials).mkString("\n"))
-    }
-
-    // If we use principal and keytab to login, also credentials can be 
renewed some time
-    // after current time, we should pass the next renewal and updating time 
to credential
-    // renewer and updater.
-    if (loginFromKeytab && nearestTimeOfNextRenewal > 
System.currentTimeMillis() &&
-      nearestTimeOfNextRenewal != Long.MaxValue) {
-
-      // Valid renewal time is 75% of next renewal time, and the valid update 
time will be
-      // slightly later then renewal time (80% of next renewal time). This is 
to make sure
-      // credentials are renewed and updated before expired.
-      val currTime = System.currentTimeMillis()
-      val renewalTime = (nearestTimeOfNextRenewal - currTime) * 0.75 + currTime
-      val updateTime = (nearestTimeOfNextRenewal - currTime) * 0.8 + currTime
-
-      sparkConf.set(CREDENTIALS_RENEWAL_TIME, renewalTime.toLong)
-      sparkConf.set(CREDENTIALS_UPDATE_TIME, updateTime.toLong)
-    }
-
     // Used to keep track of URIs added to the distributed cache. If the same 
URI is added
     // multiple times, YARN will fail to launch containers for the app with an 
internal
     // error.
@@ -793,11 +783,6 @@ private[spark] class Client(
     populateClasspath(args, hadoopConf, sparkConf, env, 
sparkConf.get(DRIVER_CLASS_PATH))
     env("SPARK_YARN_STAGING_DIR") = stagingDirPath.toString
     env("SPARK_USER") = 
UserGroupInformation.getCurrentUser().getShortUserName()
-    if (loginFromKeytab) {
-      val credentialsFile = "credentials-" + UUID.randomUUID().toString
-      sparkConf.set(CREDENTIALS_FILE_PATH, new Path(stagingDirPath, 
credentialsFile).toString)
-      logInfo(s"Credentials file set to: $credentialsFile")
-    }
 
     // Pick up any environment variables for the AM provided through 
spark.yarn.appMasterEnv.*
     val amEnvPrefix = "spark.yarn.appMasterEnv."
@@ -1014,25 +999,6 @@ private[spark] class Client(
     amContainer
   }
 
-  def setupCredentials(): Unit = {
-    loginFromKeytab = sparkConf.contains(PRINCIPAL.key)
-    if (loginFromKeytab) {
-      principal = sparkConf.get(PRINCIPAL).get
-      keytab = sparkConf.get(KEYTAB).orNull
-
-      require(keytab != null, "Keytab must be specified when principal is 
specified.")
-      logInfo("Attempting to login to the Kerberos" +
-        s" using principal: $principal and keytab: $keytab")
-      val f = new File(keytab)
-      // Generate a file name that can be used for the keytab file, that does 
not conflict
-      // with any user file.
-      amKeytabFileName = f.getName + "-" + UUID.randomUUID().toString
-      sparkConf.set(PRINCIPAL.key, principal)
-    }
-    // Defensive copy of the credentials
-    credentials = new 
Credentials(UserGroupInformation.getCurrentUser.getCredentials)
-  }
-
   /**
    * Report the state of an application until it has exited, either 
successfully or
    * due to some failure, then return a pair of the yarn application state 
(FINISHED, FAILED,

http://git-wip-us.apache.org/repos/asf/spark/blob/5fa43847/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index f406fab..8eda6cb 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.util.ConverterUtils
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.deploy.yarn.security.CredentialUpdater
 import org.apache.spark.deploy.yarn.security.YARNHadoopDelegationTokenManager
 import org.apache.spark.internal.config._
 import org.apache.spark.launcher.YarnCommandBuilderUtils
@@ -38,8 +37,6 @@ import org.apache.spark.util.Utils
 
 object YarnSparkHadoopUtil {
 
-  private var credentialUpdater: CredentialUpdater = _
-
   // Additional memory overhead
   // 10% was arrived at experimentally. In the interest of minimizing memory 
waste while covering
   // the common cases. Memory overhead tends to grow with container size.
@@ -206,21 +203,4 @@ object YarnSparkHadoopUtil {
     filesystemsToAccess + stagingFS
   }
 
-  def startCredentialUpdater(sparkConf: SparkConf): Unit = {
-    val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
-    val credentialManager = new YARNHadoopDelegationTokenManager(
-      sparkConf,
-      hadoopConf,
-      conf => YarnSparkHadoopUtil.hadoopFSsToAccess(sparkConf, conf))
-    credentialUpdater = new CredentialUpdater(sparkConf, hadoopConf, 
credentialManager)
-    credentialUpdater.start()
-  }
-
-  def stopCredentialUpdater(): Unit = {
-    if (credentialUpdater != null) {
-      credentialUpdater.stop()
-      credentialUpdater = null
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5fa43847/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index 3ba3ae5..1a99b3b 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -231,16 +231,6 @@ package object config {
 
   /* Security configuration. */
 
-  private[spark] val CREDENTIAL_FILE_MAX_COUNT =
-    ConfigBuilder("spark.yarn.credentials.file.retention.count")
-      .intConf
-      .createWithDefault(5)
-
-  private[spark] val CREDENTIALS_FILE_MAX_RETENTION =
-    ConfigBuilder("spark.yarn.credentials.file.retention.days")
-      .intConf
-      .createWithDefault(5)
-
   private[spark] val NAMENODES_TO_ACCESS = 
ConfigBuilder("spark.yarn.access.namenodes")
     .doc("Extra NameNode URLs for which to request delegation tokens. The 
NameNode that hosts " +
       "fs.defaultFS does not need to be listed here.")
@@ -271,11 +261,6 @@ package object config {
 
   /* Private configs. */
 
-  private[spark] val CREDENTIALS_FILE_PATH = 
ConfigBuilder("spark.yarn.credentials.file")
-    .internal()
-    .stringConf
-    .createWithDefault(null)
-
   // Internal config to propagate the location of the user's jar to the 
driver/executors
   private[spark] val APP_JAR = ConfigBuilder("spark.yarn.user.jar")
     .internal()
@@ -329,16 +314,6 @@ package object config {
     .stringConf
     .createOptional
 
-  private[spark] val CREDENTIALS_RENEWAL_TIME = 
ConfigBuilder("spark.yarn.credentials.renewalTime")
-    .internal()
-    .timeConf(TimeUnit.MILLISECONDS)
-    .createWithDefault(Long.MaxValue)
-
-  private[spark] val CREDENTIALS_UPDATE_TIME = 
ConfigBuilder("spark.yarn.credentials.updateTime")
-    .internal()
-    .timeConf(TimeUnit.MILLISECONDS)
-    .createWithDefault(Long.MaxValue)
-
   private[spark] val KERBEROS_RELOGIN_PERIOD = 
ConfigBuilder("spark.yarn.kerberos.relogin.period")
     .timeConf(TimeUnit.SECONDS)
     .createWithDefaultString("1m")

http://git-wip-us.apache.org/repos/asf/spark/blob/5fa43847/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
index eaf2cff..bc8d47d 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
@@ -18,221 +18,160 @@ package org.apache.spark.deploy.yarn.security
 
 import java.security.PrivilegedExceptionAction
 import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.AtomicReference
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.security.HadoopDelegationTokenManager
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.ui.UIUtils
 import org.apache.spark.util.ThreadUtils
 
 /**
- * The following methods are primarily meant to make sure long-running apps 
like Spark
- * Streaming apps can run without interruption while accessing secured 
services. The
- * scheduleLoginFromKeytab method is called on the AM to get the new 
credentials.
- * This method wakes up a thread that logs into the KDC
- * once 75% of the renewal interval of the original credentials used for the 
container
- * has elapsed. It then obtains new credentials and writes them to HDFS in a
- * pre-specified location - the prefix of which is specified in the sparkConf 
by
- * spark.yarn.credentials.file (so the file(s) would be named c-timestamp1-1, 
c-timestamp2-2 etc.
- * - each update goes to a new file, with a monotonically increasing suffix), 
also the
- * timestamp1, timestamp2 here indicates the time of next update for 
CredentialUpdater.
- * After this, the credentials are renewed once 75% of the new tokens renewal 
interval has elapsed.
+ * A manager tasked with periodically updating delegation tokens needed by the 
application.
  *
- * On the executor and driver (yarn client mode) side, the 
updateCredentialsIfRequired method is
- * called once 80% of the validity of the original credentials has elapsed. At 
that time the
- * executor finds the credentials file with the latest timestamp and checks if 
it has read those
- * credentials before (by keeping track of the suffix of the last file it 
read). If a new file has
- * appeared, it will read the credentials and update the currently running UGI 
with it. This
- * process happens again once 80% of the validity of this has expired.
+ * This manager is meant to make sure long-running apps (such as Spark 
Streaming apps) can run
+ * without interruption while accessing secured services. It periodically logs 
in to the KDC with
+ * user-provided credentials, and contacts all the configured secure services 
to obtain delegation
+ * tokens to be distributed to the rest of the application.
+ *
+ * This class will manage the kerberos login, by renewing the TGT when needed. 
Because the UGI API
+ * does not expose the TTL of the TGT, a configuration controls how often to 
check that a relogin is
+ * necessary. This is done reasonably often since the check is a no-op when 
the relogin is not yet
+ * needed. The check period can be overridden in the configuration.
+ *
+ * New delegation tokens are created once 75% of the renewal interval of the 
original tokens has
+ * elapsed. The new tokens are sent to the Spark driver endpoint once it's 
registered with the AM.
+ * The driver is tasked with distributing the tokens to other processes that 
might need them.
  */
 private[yarn] class AMCredentialRenewer(
     sparkConf: SparkConf,
-    hadoopConf: Configuration,
-    credentialManager: YARNHadoopDelegationTokenManager) extends Logging {
+    hadoopConf: Configuration) extends Logging {
 
-  private var lastCredentialsFileSuffix = 0
+  private val principal = sparkConf.get(PRINCIPAL).get
+  private val keytab = sparkConf.get(KEYTAB).get
+  private val credentialManager = new 
YARNHadoopDelegationTokenManager(sparkConf, hadoopConf)
 
-  private val credentialRenewerThread: ScheduledExecutorService =
+  private val renewalExecutor: ScheduledExecutorService =
     ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh 
Thread")
 
-  private val hadoopUtil = SparkHadoopUtil.get
+  private val driverRef = new AtomicReference[RpcEndpointRef]()
 
-  private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
-  private val daysToKeepFiles = sparkConf.get(CREDENTIALS_FILE_MAX_RETENTION)
-  private val numFilesToKeep = sparkConf.get(CREDENTIAL_FILE_MAX_COUNT)
-  private val freshHadoopConf =
-    hadoopUtil.getConfBypassingFSCache(hadoopConf, new 
Path(credentialsFile).toUri.getScheme)
+  private val renewalTask = new Runnable() {
+    override def run(): Unit = {
+      updateTokensTask()
+    }
+  }
 
-  @volatile private var timeOfNextRenewal: Long = 
sparkConf.get(CREDENTIALS_RENEWAL_TIME)
+  def setDriverRef(ref: RpcEndpointRef): Unit = {
+    driverRef.set(ref)
+  }
 
   /**
-   * Schedule a login from the keytab and principal set using the --principal 
and --keytab
-   * arguments to spark-submit. This login happens only when the credentials 
of the current user
-   * are about to expire. This method reads spark.yarn.principal and 
spark.yarn.keytab from
-   * SparkConf to do the login. This method is a no-op in non-YARN mode.
+   * Start the token renewer. Upon start, the renewer will:
    *
+   * - log in the configured user, and set up a task to keep that user's 
ticket renewed
+   * - obtain delegation tokens from all available providers
+   * - schedule a periodic task to update the tokens when needed.
+   *
+   * @return The newly logged in user.
    */
-  private[spark] def scheduleLoginFromKeytab(): Unit = {
-    val principal = sparkConf.get(PRINCIPAL).get
-    val keytab = sparkConf.get(KEYTAB).get
-
-    /**
-     * Schedule re-login and creation of new credentials. If credentials have 
already expired, this
-     * method will synchronously create new ones.
-     */
-    def scheduleRenewal(runnable: Runnable): Unit = {
-      // Run now!
-      val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
-      if (remainingTime <= 0) {
-        logInfo("Credentials have expired, creating new ones now.")
-        runnable.run()
-      } else {
-        logInfo(s"Scheduling login from keytab in $remainingTime millis.")
-        credentialRenewerThread.schedule(runnable, remainingTime, 
TimeUnit.MILLISECONDS)
+  def start(): UserGroupInformation = {
+    val originalCreds = UserGroupInformation.getCurrentUser().getCredentials()
+    val ugi = doLogin()
+
+    val tgtRenewalTask = new Runnable() {
+      override def run(): Unit = {
+        ugi.checkTGTAndReloginFromKeytab()
       }
     }
+    val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD)
+    renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, 
tgtRenewalPeriod,
+      TimeUnit.SECONDS)
 
-    // This thread periodically runs on the AM to update the credentials on 
HDFS.
-    val credentialRenewerRunnable =
-      new Runnable {
-        override def run(): Unit = {
-          try {
-            writeNewCredentialsToHDFS(principal, keytab)
-            cleanupOldFiles()
-          } catch {
-            case e: Exception =>
-              // Log the error and try to write new tokens back in an hour
-              logWarning("Failed to write out new credentials to HDFS, will 
try again in an " +
-                "hour! If this happens too often tasks will fail.", e)
-              credentialRenewerThread.schedule(this, 1, TimeUnit.HOURS)
-              return
-          }
-          scheduleRenewal(this)
-        }
-      }
-    // Schedule update of credentials. This handles the case of updating the 
credentials right now
-    // as well, since the renewal interval will be 0, and the thread will get 
scheduled
-    // immediately.
-    scheduleRenewal(credentialRenewerRunnable)
+    val creds = obtainTokensAndScheduleRenewal(ugi)
+    ugi.addCredentials(creds)
+
+    // Transfer the original user's tokens to the new user, since that's 
needed to connect to
+    // YARN. Explicitly avoid overwriting tokens that already exist in the 
current user's
+    // credentials, since those were freshly obtained above (see SPARK-23361).
+    val existing = ugi.getCredentials()
+    existing.mergeAll(originalCreds)
+    ugi.addCredentials(existing)
+
+    ugi
+  }
+
+  def stop(): Unit = {
+    renewalExecutor.shutdown()
+  }
+
+  private def scheduleRenewal(delay: Long): Unit = {
+    val _delay = math.max(0, delay)
+    logInfo(s"Scheduling login from keytab in 
${UIUtils.formatDuration(delay)}.")
+    renewalExecutor.schedule(renewalTask, _delay, TimeUnit.MILLISECONDS)
   }
 
-  // Keeps only files that are newer than daysToKeepFiles days, and deletes 
everything else. At
-  // least numFilesToKeep files are kept for safety
-  private def cleanupOldFiles(): Unit = {
-    import scala.concurrent.duration._
+  /**
+   * Periodic task to login to the KDC and create new delegation tokens. 
Re-schedules itself
+   * to fetch the next set of tokens when needed.
+   */
+  private def updateTokensTask(): Unit = {
     try {
-      val remoteFs = FileSystem.get(freshHadoopConf)
-      val credentialsPath = new Path(credentialsFile)
-      val thresholdTime = System.currentTimeMillis() - 
(daysToKeepFiles.days).toMillis
-      hadoopUtil.listFilesSorted(
-        remoteFs, credentialsPath.getParent,
-        credentialsPath.getName, 
SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
-        .dropRight(numFilesToKeep)
-        .takeWhile(_.getModificationTime < thresholdTime)
-        .foreach(x => remoteFs.delete(x.getPath, true))
+      val freshUGI = doLogin()
+      val creds = obtainTokensAndScheduleRenewal(freshUGI)
+      val tokens = SparkHadoopUtil.get.serialize(creds)
+
+      val driver = driverRef.get()
+      if (driver != null) {
+        logInfo("Updating delegation tokens.")
+        driver.send(UpdateDelegationTokens(tokens))
+      } else {
+        // This shouldn't really happen, since the driver should register way 
before tokens expire
+        // (or the AM should time out the application).
+        logWarning("Delegation tokens close to expiration but no driver has 
registered yet.")
+        SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf)
+      }
     } catch {
-      // Such errors are not fatal, so don't throw. Make sure they are logged 
though
       case e: Exception =>
-        logWarning("Error while attempting to cleanup old credentials. If you 
are seeing many " +
-          "such warnings there may be an issue with your HDFS cluster.", e)
+        val delay = 
TimeUnit.SECONDS.toMillis(sparkConf.get(CREDENTIALS_RENEWAL_RETRY_WAIT))
+        logWarning(s"Failed to update tokens, will try again in 
${UIUtils.formatDuration(delay)}!" +
+          " If this happens too often tasks will fail.", e)
+        scheduleRenewal(delay)
     }
   }
 
-  private def writeNewCredentialsToHDFS(principal: String, keytab: String): 
Unit = {
-    // Keytab is copied by YARN to the working directory of the AM, so full 
path is
-    // not needed.
-
-    // HACK:
-    // HDFS will not issue new delegation tokens, if the Credentials object
-    // passed in already has tokens for that FS even if the tokens are expired 
(it really only
-    // checks if there are tokens for the service, and not if they are valid). 
So the only real
-    // way to get new tokens is to make sure a different Credentials object is 
used each time to
-    // get new tokens and then the new tokens are copied over the current 
user's Credentials.
-    // So:
-    // - we login as a different user and get the UGI
-    // - use that UGI to get the tokens (see doAs block below)
-    // - copy the tokens over to the current user's credentials (this will 
overwrite the tokens
-    // in the current user's Credentials object for this FS).
-    // The login to KDC happens each time new tokens are required, but this is 
rare enough to not
-    // have to worry about (like once every day or so). This makes this code 
clearer than having
-    // to login and then relogin every time (the HDFS API may not relogin 
since we don't use this
-    // UGI directly for HDFS communication.
-    logInfo(s"Attempting to login to KDC using principal: $principal")
-    val keytabLoggedInUGI = 
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
-    logInfo("Successfully logged into KDC.")
-    val tempCreds = keytabLoggedInUGI.getCredentials
-    val credentialsPath = new Path(credentialsFile)
-    val dst = credentialsPath.getParent
-    var nearestNextRenewalTime = Long.MaxValue
-    keytabLoggedInUGI.doAs(new PrivilegedExceptionAction[Void] {
-      // Get a copy of the credentials
-      override def run(): Void = {
-        nearestNextRenewalTime = credentialManager.obtainDelegationTokens(
-          freshHadoopConf,
-          tempCreds)
-        null
+  /**
+   * Obtain new delegation tokens from the available providers. Schedules a 
new task to fetch
+   * new tokens before the new set expires.
+   *
+   * @return Credentials containing the new tokens.
+   */
+  private def obtainTokensAndScheduleRenewal(ugi: UserGroupInformation): 
Credentials = {
+    ugi.doAs(new PrivilegedExceptionAction[Credentials]() {
+      override def run(): Credentials = {
+        val creds = new Credentials()
+        val nextRenewal = credentialManager.obtainDelegationTokens(hadoopConf, 
creds)
+
+        val timeToWait = 
SparkHadoopUtil.nextCredentialRenewalTime(nextRenewal, sparkConf) -
+          System.currentTimeMillis()
+        scheduleRenewal(timeToWait)
+        creds
       }
     })
-
-    val currTime = System.currentTimeMillis()
-    val timeOfNextUpdate = if (nearestNextRenewalTime <= currTime) {
-      // If next renewal time is earlier than current time, we set next 
renewal time to current
-      // time, this will trigger next renewal immediately. Also set next 
update time to current
-      // time. There still has a gap between token renewal and update will 
potentially introduce
-      // issue.
-      logWarning(s"Next credential renewal time ($nearestNextRenewalTime) is 
earlier than " +
-        s"current time ($currTime), which is unexpected, please check your 
credential renewal " +
-        "related configurations in the target services.")
-      timeOfNextRenewal = currTime
-      currTime
-    } else {
-      // Next valid renewal time is about 75% of credential renewal time, and 
update time is
-      // slightly later than valid renewal time (80% of renewal time).
-      timeOfNextRenewal =
-        SparkHadoopUtil.getDateOfNextUpdate(nearestNextRenewalTime, 0.75)
-      SparkHadoopUtil.getDateOfNextUpdate(nearestNextRenewalTime, 0.8)
-    }
-
-    // Add the temp credentials back to the original ones.
-    UserGroupInformation.getCurrentUser.addCredentials(tempCreds)
-    val remoteFs = FileSystem.get(freshHadoopConf)
-    // If lastCredentialsFileSuffix is 0, then the AM is either started or 
restarted. If the AM
-    // was restarted, then the lastCredentialsFileSuffix might be > 0, so find 
the newest file
-    // and update the lastCredentialsFileSuffix.
-    if (lastCredentialsFileSuffix == 0) {
-      hadoopUtil.listFilesSorted(
-        remoteFs, credentialsPath.getParent,
-        credentialsPath.getName, 
SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
-        .lastOption.foreach { status =>
-        lastCredentialsFileSuffix = 
hadoopUtil.getSuffixForCredentialsPath(status.getPath)
-      }
-    }
-    val nextSuffix = lastCredentialsFileSuffix + 1
-
-    val tokenPathStr =
-      credentialsFile + SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM +
-        timeOfNextUpdate.toLong.toString + 
SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM +
-          nextSuffix
-    val tokenPath = new Path(tokenPathStr)
-    val tempTokenPath = new Path(tokenPathStr + 
SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
-
-    logInfo("Writing out delegation tokens to " + tempTokenPath.toString)
-    val credentials = UserGroupInformation.getCurrentUser.getCredentials
-    credentials.writeTokenStorageFile(tempTokenPath, freshHadoopConf)
-    logInfo(s"Delegation Tokens written out successfully. Renaming file to 
$tokenPathStr")
-    remoteFs.rename(tempTokenPath, tokenPath)
-    logInfo("Delegation token file rename complete.")
-    lastCredentialsFileSuffix = nextSuffix
   }
 
-  def stop(): Unit = {
-    credentialRenewerThread.shutdown()
+  private def doLogin(): UserGroupInformation = {
+    logInfo(s"Attempting to login to KDC using principal: $principal")
+    val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, 
keytab)
+    logInfo("Successfully logged into KDC.")
+    ugi
   }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5fa43847/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala
deleted file mode 100644
index fe173df..0000000
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn.security
-
-import java.util.concurrent.{Executors, TimeUnit}
-
-import scala.util.control.NonFatal
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.security.{Credentials, UserGroupInformation}
-
-import org.apache.spark.SparkConf
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.internal.Logging
-import org.apache.spark.util.{ThreadUtils, Utils}
-
-private[spark] class CredentialUpdater(
-    sparkConf: SparkConf,
-    hadoopConf: Configuration,
-    credentialManager: YARNHadoopDelegationTokenManager) extends Logging {
-
-  @volatile private var lastCredentialsFileSuffix = 0
-
-  private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
-  private val freshHadoopConf =
-    SparkHadoopUtil.get.getConfBypassingFSCache(
-      hadoopConf, new Path(credentialsFile).toUri.getScheme)
-
-  private val credentialUpdater =
-    Executors.newSingleThreadScheduledExecutor(
-      ThreadUtils.namedThreadFactory("Credential Refresh Thread"))
-
-  // This thread wakes up and picks up new credentials from HDFS, if any.
-  private val credentialUpdaterRunnable =
-    new Runnable {
-      override def run(): Unit = 
Utils.logUncaughtExceptions(updateCredentialsIfRequired())
-    }
-
-  /** Start the credential updater task */
-  def start(): Unit = {
-    val startTime = sparkConf.get(CREDENTIALS_UPDATE_TIME)
-    val remainingTime = startTime - System.currentTimeMillis()
-    if (remainingTime <= 0) {
-      credentialUpdater.schedule(credentialUpdaterRunnable, 1, 
TimeUnit.MINUTES)
-    } else {
-      logInfo(s"Scheduling credentials refresh from HDFS in $remainingTime 
ms.")
-      credentialUpdater.schedule(credentialUpdaterRunnable, remainingTime, 
TimeUnit.MILLISECONDS)
-    }
-  }
-
-  private def updateCredentialsIfRequired(): Unit = {
-    val timeToNextUpdate = try {
-      val credentialsFilePath = new Path(credentialsFile)
-      val remoteFs = FileSystem.get(freshHadoopConf)
-      SparkHadoopUtil.get.listFilesSorted(
-        remoteFs, credentialsFilePath.getParent,
-        credentialsFilePath.getName, 
SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
-        .lastOption.map { credentialsStatus =>
-          val suffix = 
SparkHadoopUtil.get.getSuffixForCredentialsPath(credentialsStatus.getPath)
-          if (suffix > lastCredentialsFileSuffix) {
-            logInfo("Reading new credentials from " + 
credentialsStatus.getPath)
-            val newCredentials = getCredentialsFromHDFSFile(remoteFs, 
credentialsStatus.getPath)
-            lastCredentialsFileSuffix = suffix
-            UserGroupInformation.getCurrentUser.addCredentials(newCredentials)
-            logInfo("Credentials updated from credentials file.")
-
-            val remainingTime = 
(getTimeOfNextUpdateFromFileName(credentialsStatus.getPath)
-              - System.currentTimeMillis())
-            if (remainingTime <= 0) TimeUnit.MINUTES.toMillis(1) else 
remainingTime
-          } else {
-            // If current credential file is older than expected, sleep 1 hour 
and check again.
-            TimeUnit.HOURS.toMillis(1)
-          }
-      }.getOrElse {
-        // Wait for 1 minute to check again if there's no credential file 
currently
-        TimeUnit.MINUTES.toMillis(1)
-      }
-    } catch {
-      // Since the file may get deleted while we are reading it, catch the 
Exception and come
-      // back in an hour to try again
-      case NonFatal(e) =>
-        logWarning("Error while trying to update credentials, will try again 
in 1 hour", e)
-        TimeUnit.HOURS.toMillis(1)
-    }
-
-    logInfo(s"Scheduling credentials refresh from HDFS in $timeToNextUpdate 
ms.")
-    credentialUpdater.schedule(
-      credentialUpdaterRunnable, timeToNextUpdate, TimeUnit.MILLISECONDS)
-  }
-
-  private def getCredentialsFromHDFSFile(remoteFs: FileSystem, tokenPath: 
Path): Credentials = {
-    val stream = remoteFs.open(tokenPath)
-    try {
-      val newCredentials = new Credentials()
-      newCredentials.readTokenStorageStream(stream)
-      newCredentials
-    } finally {
-      stream.close()
-    }
-  }
-
-  private def getTimeOfNextUpdateFromFileName(credentialsPath: Path): Long = {
-    val name = credentialsPath.getName
-    val index = 
name.lastIndexOf(SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM)
-    val slice = name.substring(0, index)
-    val last2index = 
slice.lastIndexOf(SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM)
-    name.substring(last2index + 1, index).toLong
-  }
-
-  def stop(): Unit = {
-    credentialUpdater.shutdown()
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/5fa43847/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala
index 163cfb4..d4eeb6b 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala
@@ -22,11 +22,11 @@ import java.util.ServiceLoader
 import scala.collection.JavaConverters._
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FileSystem
 import org.apache.hadoop.security.Credentials
 
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.util.Utils
 
@@ -37,11 +37,10 @@ import org.apache.spark.util.Utils
  */
 private[yarn] class YARNHadoopDelegationTokenManager(
     sparkConf: SparkConf,
-    hadoopConf: Configuration,
-    fileSystems: Configuration => Set[FileSystem]) extends Logging {
+    hadoopConf: Configuration) extends Logging {
 
-  private val delegationTokenManager =
-    new HadoopDelegationTokenManager(sparkConf, hadoopConf, fileSystems)
+  private val delegationTokenManager = new 
HadoopDelegationTokenManager(sparkConf, hadoopConf,
+    conf => YarnSparkHadoopUtil.hadoopFSsToAccess(sparkConf, conf))
 
   // public for testing
   val credentialProviders = getCredentialProviders

http://git-wip-us.apache.org/repos/asf/spark/blob/5fa43847/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 0c6206e..06e54a2 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.hadoop.yarn.api.records.YarnApplicationState
 
 import org.apache.spark.{SparkContext, SparkException}
-import org.apache.spark.deploy.yarn.{Client, ClientArguments, 
YarnSparkHadoopUtil}
+import org.apache.spark.deploy.yarn.{Client, ClientArguments}
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.launcher.SparkAppHandle
@@ -62,12 +62,6 @@ private[spark] class YarnClientSchedulerBackend(
     super.start()
     waitForApplication()
 
-    // SPARK-8851: In yarn-client mode, the AM still does the credentials 
refresh. The driver
-    // reads the credentials from HDFS, just like the executors and updates 
its own credentials
-    // cache.
-    if (conf.contains("spark.yarn.credentials.file")) {
-      YarnSparkHadoopUtil.startCredentialUpdater(conf)
-    }
     monitorThread = asyncMonitorApplication()
     monitorThread.start()
   }
@@ -153,7 +147,6 @@ private[spark] class YarnClientSchedulerBackend(
     client.reportLauncherState(SparkAppHandle.State.FINISHED)
 
     super.stop()
-    YarnSparkHadoopUtil.stopCredentialUpdater()
     client.stop()
     logInfo("Stopped")
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/5fa43847/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index bb615c3..63bea3e 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -24,9 +24,11 @@ import scala.concurrent.ExecutionContext.Implicits.global
 import scala.util.{Failure, Success}
 import scala.util.control.NonFatal
 
+import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
 
 import org.apache.spark.SparkContext
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.rpc._
 import org.apache.spark.scheduler._
@@ -70,6 +72,7 @@ private[spark] abstract class YarnSchedulerBackend(
   /** Scheduler extension services. */
   private val services: SchedulerExtensionServices = new 
SchedulerExtensionServices()
 
+
   /**
    * Bind to YARN. This *must* be done before calling [[start()]].
    *
@@ -263,8 +266,13 @@ private[spark] abstract class YarnSchedulerBackend(
           logWarning(s"Requesting driver to remove executor $executorId for 
reason $reason")
           driverEndpoint.send(r)
         }
-    }
 
+      case u @ UpdateDelegationTokens(tokens) =>
+        // Add the tokens to the current user and send a message to the 
scheduler so that it
+        // notifies all registered executors of the new tokens.
+        SparkHadoopUtil.get.addDelegationTokens(tokens, sc.conf)
+        driverEndpoint.send(u)
+    }
 
     override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
       case r: RequestExecutors =>

http://git-wip-us.apache.org/repos/asf/spark/blob/5fa43847/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
index 3c7cdc0..9fa749b 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
@@ -22,7 +22,6 @@ import org.apache.hadoop.security.Credentials
 import org.scalatest.Matchers
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
 
 class YARNHadoopDelegationTokenManagerSuite extends SparkFunSuite with 
Matchers {
   private var credentialManager: YARNHadoopDelegationTokenManager = null
@@ -36,11 +35,7 @@ class YARNHadoopDelegationTokenManagerSuite extends 
SparkFunSuite with Matchers
   }
 
   test("Correctly loads credential providers") {
-    credentialManager = new YARNHadoopDelegationTokenManager(
-      sparkConf,
-      hadoopConf,
-      conf => YarnSparkHadoopUtil.hadoopFSsToAccess(sparkConf, conf))
-
+    credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, 
hadoopConf)
     credentialManager.credentialProviders.get("yarn-test") should not be (None)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5fa43847/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index aed67a5..3703a87 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -57,9 +57,6 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: 
Time)
       "spark.yarn.jars",
       "spark.yarn.keytab",
       "spark.yarn.principal",
-      "spark.yarn.credentials.file",
-      "spark.yarn.credentials.renewalTime",
-      "spark.yarn.credentials.updateTime",
       "spark.ui.filters",
       "spark.mesos.driver.frameworkId")
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to