Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22624#discussion_r228398489 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala --- @@ -110,32 +209,105 @@ private[spark] class HadoopDelegationTokenManager( } /** - * Get delegation token provider for the specified service. + * List of file systems for which to obtain delegation tokens. The base implementation + * returns just the default file system in the given Hadoop configuration. */ - def getServiceDelegationTokenProvider(service: String): Option[HadoopDelegationTokenProvider] = { - delegationTokenProviders.get(service) + protected def fileSystemsToAccess(): Set[FileSystem] = { + Set(FileSystem.get(hadoopConf)) + } + + private def scheduleRenewal(delay: Long): Unit = { + val _delay = math.max(0, delay) + logInfo(s"Scheduling login from keytab in ${UIUtils.formatDuration(delay)}.") + + val renewalTask = new Runnable() { + override def run(): Unit = { + updateTokensTask() + } + } + renewalExecutor.schedule(renewalTask, _delay, TimeUnit.MILLISECONDS) } /** - * Writes delegation tokens to creds. Delegation tokens are fetched from all registered - * providers. - * - * @param hadoopConf hadoop Configuration - * @param creds Credentials that will be updated in place (overwritten) - * @return Time after which the fetched delegation tokens should be renewed. + * Periodic task to login to the KDC and create new delegation tokens. Re-schedules itself + * to fetch the next set of tokens when needed. */ - def obtainDelegationTokens( - hadoopConf: Configuration, - creds: Credentials): Long = { - delegationTokenProviders.values.flatMap { provider => - if (provider.delegationTokensRequired(sparkConf, hadoopConf)) { - provider.obtainDelegationTokens(hadoopConf, sparkConf, creds) + private def updateTokensTask(): Unit = { + try { + val freshUGI = doLogin() + val creds = obtainTokensAndScheduleRenewal(freshUGI) + val tokens = SparkHadoopUtil.get.serialize(creds) + + val driver = driverRef.get() + if (driver != null) { + logInfo("Updating delegation tokens.") + driver.send(UpdateDelegationTokens(tokens)) } else { - logDebug(s"Service ${provider.serviceName} does not require a token." + - s" Check your configuration to see if security is disabled or not.") - None + // This shouldn't really happen, since the driver should register way before tokens expire. + logWarning("Delegation tokens close to expiration but no driver has registered yet.") + SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf) } - }.foldLeft(Long.MaxValue)(math.min) + } catch { + case e: Exception => + val delay = TimeUnit.SECONDS.toMillis(sparkConf.get(CREDENTIALS_RENEWAL_RETRY_WAIT)) + logWarning(s"Failed to update tokens, will try again in ${UIUtils.formatDuration(delay)}!" + + " If this happens too often tasks will fail.", e) + scheduleRenewal(delay) + } + } + + /** + * Obtain new delegation tokens from the available providers. Schedules a new task to fetch + * new tokens before the new set expires. + * + * @return Credentials containing the new tokens. + */ + private def obtainTokensAndScheduleRenewal(ugi: UserGroupInformation): Credentials = { + ugi.doAs(new PrivilegedExceptionAction[Credentials]() { + override def run(): Credentials = { + val creds = new Credentials() + val nextRenewal = obtainDelegationTokens(creds) + + // Calculate the time when new credentials should be created, based on the configured + // ratio. + val now = System.currentTimeMillis + val ratio = sparkConf.get(CREDENTIALS_RENEWAL_INTERVAL_RATIO) + val adjustedNextRenewal = (now + (ratio * (nextRenewal - now))).toLong + + scheduleRenewal(adjustedNextRenewal - now) --- End diff -- you're adding `now` and subtracting it off again, instead you could do ```scala val adjustedRenewalDelay = (ratio * (nextRenewal - now)).toLong scheduleRenewal(adjustedRenewalDelay) ```
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org