Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/14065#discussion_r73931146
--- Diff:
yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala
---
@@ -53,50 +53,58 @@ private[spark] class CredentialUpdater(
override def run(): Unit =
Utils.logUncaughtExceptions(updateCredentialsIfRequired())
}
- @volatile private var timeOfNextUpdate =
sparkConf.get(CREDENTIALS_UPDATE_TIME)
+ /** Start the credentail updater thread periodically to get the
credentials */
+ def bootstrap(): Unit = {
+ val bootstrapTime = sparkConf.get(CREDENTIALS_RENEWAL_TIME)
+ val remainingTime = bootstrapTime - System.currentTimeMillis()
+ if (remainingTime <= 0) {
+ // We just checked for new credentials but none were there, wait a
minute and retry.
+ // This handles the shutdown case where the staging directory may
have been removed(see
+ // SPARK-12316 for more details).
+ credentialUpdater.schedule(credentialUpdaterRunnable, 1,
TimeUnit.MINUTES)
+ } else {
+ logInfo(s"Scheduling credentials refresh from HDFS in $remainingTime
millis.")
+ credentialUpdater.schedule(credentialUpdaterRunnable, remainingTime,
TimeUnit.MILLISECONDS)
+ }
+ }
- def updateCredentialsIfRequired(): Unit = {
- try {
+ private def updateCredentialsIfRequired(): Unit = {
+ val timeToNextUpdate = try {
val credentialsFilePath = new Path(credentialsFile)
val remoteFs = FileSystem.get(freshHadoopConf)
SparkHadoopUtil.get.listFilesSorted(
remoteFs, credentialsFilePath.getParent,
credentialsFilePath.getName,
SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
- .lastOption.foreach { credentialsStatus =>
+ .lastOption.map { credentialsStatus =>
val suffix =
SparkHadoopUtil.get.getSuffixForCredentialsPath(credentialsStatus.getPath)
if (suffix > lastCredentialsFileSuffix) {
logInfo("Reading new credentials from " +
credentialsStatus.getPath)
val newCredentials = getCredentialsFromHDFSFile(remoteFs,
credentialsStatus.getPath)
lastCredentialsFileSuffix = suffix
UserGroupInformation.getCurrentUser.addCredentials(newCredentials)
logInfo("Credentials updated from credentials file.")
- timeOfNextUpdate =
getTimeOfNextUpdateFromFileName(credentialsStatus.getPath)
+
+ val remainingTime =
+ getTimeOfNextUpdateFromFileName(credentialsStatus.getPath) -
System.currentTimeMillis()
+ if (remainingTime <= 0) 60L * 1000 else remainingTime
--- End diff --
`TimeUnit.HOUR.toMillis(1)`
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]