Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/14065#discussion_r73931365
--- Diff:
yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala
---
@@ -53,50 +53,58 @@ private[spark] class CredentialUpdater(
override def run(): Unit =
Utils.logUncaughtExceptions(updateCredentialsIfRequired())
}
- @volatile private var timeOfNextUpdate =
sparkConf.get(CREDENTIALS_UPDATE_TIME)
+ /** Start the credentail updater thread periodically to get the
credentials */
+ def bootstrap(): Unit = {
+ val bootstrapTime = sparkConf.get(CREDENTIALS_RENEWAL_TIME)
+ val remainingTime = bootstrapTime - System.currentTimeMillis()
+ if (remainingTime <= 0) {
+ // We just checked for new credentials but none were there, wait a
minute and retry.
+ // This handles the shutdown case where the staging directory may
have been removed(see
+ // SPARK-12316 for more details).
+ credentialUpdater.schedule(credentialUpdaterRunnable, 1,
TimeUnit.MINUTES)
+ } else {
+ logInfo(s"Scheduling credentials refresh from HDFS in $remainingTime
millis.")
+ credentialUpdater.schedule(credentialUpdaterRunnable, remainingTime,
TimeUnit.MILLISECONDS)
+ }
+ }
- def updateCredentialsIfRequired(): Unit = {
- try {
+ private def updateCredentialsIfRequired(): Unit = {
+ val timeToNextUpdate = try {
val credentialsFilePath = new Path(credentialsFile)
val remoteFs = FileSystem.get(freshHadoopConf)
SparkHadoopUtil.get.listFilesSorted(
remoteFs, credentialsFilePath.getParent,
credentialsFilePath.getName,
SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
- .lastOption.foreach { credentialsStatus =>
+ .lastOption.map { credentialsStatus =>
val suffix =
SparkHadoopUtil.get.getSuffixForCredentialsPath(credentialsStatus.getPath)
if (suffix > lastCredentialsFileSuffix) {
logInfo("Reading new credentials from " +
credentialsStatus.getPath)
val newCredentials = getCredentialsFromHDFSFile(remoteFs,
credentialsStatus.getPath)
lastCredentialsFileSuffix = suffix
UserGroupInformation.getCurrentUser.addCredentials(newCredentials)
logInfo("Credentials updated from credentials file.")
- timeOfNextUpdate =
getTimeOfNextUpdateFromFileName(credentialsStatus.getPath)
+
+ val remainingTime =
+ getTimeOfNextUpdateFromFileName(credentialsStatus.getPath) -
System.currentTimeMillis()
+ if (remainingTime <= 0) 60L * 1000 else remainingTime
} else {
- // Check every hour to see if new credentials arrived.
- logInfo("Updated credentials were expected, but the AM has not
updated the " +
- "credentials yet, will check again in an hour.")
- credentialUpdater.schedule(credentialUpdaterRunnable, 1,
TimeUnit.HOURS)
- return
+ // If current credential file is elder than expected, sleep 1
hour and check again.
+ 3600L * 1000
}
- }
- val remainingTime = timeOfNextUpdate - System.currentTimeMillis()
- if (remainingTime <= 0) {
- // We just checked for new credentials but none were there, wait a
minute and retry.
- // This handles the shutdown case where the staging directory may
have been removed(see
- // SPARK-12316 for more details).
- credentialUpdater.schedule(credentialUpdaterRunnable, 1,
TimeUnit.MINUTES)
- } else {
- logInfo(s"Scheduling credentials refresh from HDFS in
$remainingTime millis.")
- credentialUpdater.schedule(
- credentialUpdaterRunnable, remainingTime, TimeUnit.MILLISECONDS)
+ }.getOrElse {
+ // Wait for 1 minute to check again if there's no credential file
currently
+ 60L * 1000
}
} catch {
// Since the file may get deleted while we are reading it, catch the
Exception and come
// back in an hour to try again
case NonFatal(e) =>
logWarning("Error while trying to update credentials, will try
again in 1 hour", e)
- credentialUpdater.schedule(credentialUpdaterRunnable, 1,
TimeUnit.HOURS)
+ 3600L * 1000
--- End diff --
`TimeUnit...` you get the gist.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]