Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/20657#discussion_r173236591
--- Diff:
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
---
@@ -18,221 +18,160 @@ package org.apache.spark.deploy.yarn.security
import java.security.PrivilegedExceptionAction
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.AtomicReference
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.security.HadoopDelegationTokenManager
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
+import org.apache.spark.rpc.RpcEndpointRef
+import
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.ui.UIUtils
import org.apache.spark.util.ThreadUtils
/**
- * The following methods are primarily meant to make sure long-running
apps like Spark
- * Streaming apps can run without interruption while accessing secured
services. The
- * scheduleLoginFromKeytab method is called on the AM to get the new
credentials.
- * This method wakes up a thread that logs into the KDC
- * once 75% of the renewal interval of the original credentials used for
the container
- * has elapsed. It then obtains new credentials and writes them to HDFS in
a
- * pre-specified location - the prefix of which is specified in the
sparkConf by
- * spark.yarn.credentials.file (so the file(s) would be named
c-timestamp1-1, c-timestamp2-2 etc.
- * - each update goes to a new file, with a monotonically increasing
suffix), also the
- * timestamp1, timestamp2 here indicates the time of next update for
CredentialUpdater.
- * After this, the credentials are renewed once 75% of the new tokens
renewal interval has elapsed.
+ * A manager tasked with periodically updating delegation tokens needed by
the application.
*
- * On the executor and driver (yarn client mode) side, the
updateCredentialsIfRequired method is
- * called once 80% of the validity of the original credentials has
elapsed. At that time the
- * executor finds the credentials file with the latest timestamp and
checks if it has read those
- * credentials before (by keeping track of the suffix of the last file it
read). If a new file has
- * appeared, it will read the credentials and update the currently running
UGI with it. This
- * process happens again once 80% of the validity of this has expired.
+ * This manager is meant to make sure long-running apps (such as Spark
Streaming apps) can run
+ * without interruption while accessing secured services. It periodically
logs in to the KDC with
+ * user-provided credentials, and contacts all the configured secure
services to obtain delegation
+ * tokens to be distributed to the rest of the application.
+ *
+ * This class will manage the kerberos login, by renewing the TGT when
needed. Because the UGI API
+ * does not expose the TTL of the TGT, a configuration controls how often
to check that a relogin is
+ * necessary. This is done reasonably often since the check is a no-op
when the relogin is not yet
+ * needed. The check period can be overridden in the configuration.
+ *
+ * New delegation tokens are created once 75% of the renewal interval of
the original tokens has
+ * elapsed. The new tokens are sent to the Spark driver endpoint once it's
registered with the AM.
+ * The driver is tasked with distributing the tokens to other processes
that might need them.
*/
private[yarn] class AMCredentialRenewer(
sparkConf: SparkConf,
- hadoopConf: Configuration,
- credentialManager: YARNHadoopDelegationTokenManager) extends Logging {
+ hadoopConf: Configuration) extends Logging {
- private var lastCredentialsFileSuffix = 0
+ private val principal = sparkConf.get(PRINCIPAL).get
+ private val keytab = sparkConf.get(KEYTAB).get
+ private val credentialManager = new
YARNHadoopDelegationTokenManager(sparkConf, hadoopConf)
- private val credentialRenewerThread: ScheduledExecutorService =
+ private val renewalExecutor: ScheduledExecutorService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh
Thread")
- private val hadoopUtil = SparkHadoopUtil.get
+ private val driverRef = new AtomicReference[RpcEndpointRef]()
- private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
- private val daysToKeepFiles =
sparkConf.get(CREDENTIALS_FILE_MAX_RETENTION)
- private val numFilesToKeep = sparkConf.get(CREDENTIAL_FILE_MAX_COUNT)
- private val freshHadoopConf =
- hadoopUtil.getConfBypassingFSCache(hadoopConf, new
Path(credentialsFile).toUri.getScheme)
+ private val renewalTask = new Runnable() {
+ override def run(): Unit = {
+ updateTokensTask()
+ }
+ }
- @volatile private var timeOfNextRenewal: Long =
sparkConf.get(CREDENTIALS_RENEWAL_TIME)
+ def setDriverRef(ref: RpcEndpointRef): Unit = {
+ driverRef.set(ref)
+ }
/**
- * Schedule a login from the keytab and principal set using the
--principal and --keytab
- * arguments to spark-submit. This login happens only when the
credentials of the current user
- * are about to expire. This method reads spark.yarn.principal and
spark.yarn.keytab from
- * SparkConf to do the login. This method is a no-op in non-YARN mode.
+ * Start the token renewer. Upon start, the renewer will:
*
+ * - log in the configured user, and set up a task to keep that user's
ticket renewed
+ * - obtain delegation tokens from all available providers
+ * - schedule a periodic task to update the tokens when needed.
+ *
+ * @return The newly logged in user.
*/
- private[spark] def scheduleLoginFromKeytab(): Unit = {
- val principal = sparkConf.get(PRINCIPAL).get
- val keytab = sparkConf.get(KEYTAB).get
-
- /**
- * Schedule re-login and creation of new credentials. If credentials
have already expired, this
- * method will synchronously create new ones.
- */
- def scheduleRenewal(runnable: Runnable): Unit = {
- // Run now!
- val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
- if (remainingTime <= 0) {
- logInfo("Credentials have expired, creating new ones now.")
- runnable.run()
- } else {
- logInfo(s"Scheduling login from keytab in $remainingTime millis.")
- credentialRenewerThread.schedule(runnable, remainingTime,
TimeUnit.MILLISECONDS)
+ def start(): UserGroupInformation = {
+ val originalCreds =
UserGroupInformation.getCurrentUser().getCredentials()
+ val ugi = doLogin()
+
+ val tgtRenewalTask = new Runnable() {
+ override def run(): Unit = {
+ ugi.checkTGTAndReloginFromKeytab()
}
}
+ val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD)
+ renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod,
tgtRenewalPeriod,
+ TimeUnit.SECONDS)
- // This thread periodically runs on the AM to update the credentials
on HDFS.
- val credentialRenewerRunnable =
- new Runnable {
- override def run(): Unit = {
- try {
- writeNewCredentialsToHDFS(principal, keytab)
- cleanupOldFiles()
- } catch {
- case e: Exception =>
- // Log the error and try to write new tokens back in an hour
- logWarning("Failed to write out new credentials to HDFS,
will try again in an " +
- "hour! If this happens too often tasks will fail.", e)
- credentialRenewerThread.schedule(this, 1, TimeUnit.HOURS)
- return
- }
- scheduleRenewal(this)
- }
- }
- // Schedule update of credentials. This handles the case of updating
the credentials right now
- // as well, since the renewal interval will be 0, and the thread will
get scheduled
- // immediately.
- scheduleRenewal(credentialRenewerRunnable)
+ val creds = obtainTokensAndScheduleRenewal(ugi)
+ ugi.addCredentials(creds)
+
+ // Transfer the original user's tokens to the new user, since that's
needed to connect to
+ // YARN. Explicitly avoid overwriting tokens that already exist in the
current user's
+ // credentials, since those were freshly obtained above (see
SPARK-23361).
+ val existing = ugi.getCredentials()
+ existing.mergeAll(originalCreds)
+ ugi.addCredentials(existing)
+
+ ugi
+ }
+
+ def stop(): Unit = {
+ renewalExecutor.shutdown()
+ }
+
+ private def scheduleRenewal(delay: Long): Unit = {
+ val _delay = math.max(0, delay)
+ logInfo(s"Scheduling login from keytab in
${UIUtils.formatDuration(delay)}.")
+ renewalExecutor.schedule(renewalTask, _delay, TimeUnit.MILLISECONDS)
}
- // Keeps only files that are newer than daysToKeepFiles days, and
deletes everything else. At
- // least numFilesToKeep files are kept for safety
- private def cleanupOldFiles(): Unit = {
- import scala.concurrent.duration._
+ /**
+ * Periodic task to login to the KDC and create new delegation tokens.
Re-schedules itself
+ * to fetch the next set of tokens when needed.
+ */
+ private def updateTokensTask(): Unit = {
try {
- val remoteFs = FileSystem.get(freshHadoopConf)
- val credentialsPath = new Path(credentialsFile)
- val thresholdTime = System.currentTimeMillis() -
(daysToKeepFiles.days).toMillis
- hadoopUtil.listFilesSorted(
- remoteFs, credentialsPath.getParent,
- credentialsPath.getName,
SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
- .dropRight(numFilesToKeep)
- .takeWhile(_.getModificationTime < thresholdTime)
- .foreach(x => remoteFs.delete(x.getPath, true))
+ val freshUGI = doLogin()
+ val creds = obtainTokensAndScheduleRenewal(freshUGI)
+ val tokens = SparkHadoopUtil.get.serialize(creds)
+
+ val driver = driverRef.get()
+ if (driver != null) {
+ logInfo("Updating delegation tokens.")
+ driver.send(UpdateDelegationTokens(tokens))
+ } else {
+ // This shouldn't really happen, since the driver should register
way before tokens expire
+ // (or the AM should time out the application).
+ logWarning("Delegation tokens close to expiration but no driver
has registered yet.")
+ SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf)
+ }
} catch {
- // Such errors are not fatal, so don't throw. Make sure they are
logged though
case e: Exception =>
- logWarning("Error while attempting to cleanup old credentials. If
you are seeing many " +
- "such warnings there may be an issue with your HDFS cluster.", e)
+ val delay =
TimeUnit.SECONDS.toMillis(sparkConf.get(CREDENTIALS_RENEWAL_RETRY_WAIT))
+ logWarning(s"Failed to update tokens, will try again in
${UIUtils.formatDuration(delay)}!" +
+ " If this happens too often tasks will fail.", e)
+ scheduleRenewal(delay)
}
}
- private def writeNewCredentialsToHDFS(principal: String, keytab:
String): Unit = {
- // Keytab is copied by YARN to the working directory of the AM, so
full path is
- // not needed.
-
- // HACK:
--- End diff --
Not sure I understand the question. This comment talks about a lot of
things. The only thing that really applies still is the using a new UGI to get
new delegation tokens. That's not really a hack, that's just how the API
works...
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]