Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r228398489
  
    --- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
 ---
    @@ -110,32 +209,105 @@ private[spark] class HadoopDelegationTokenManager(
       }
     
       /**
    -   * Get delegation token provider for the specified service.
    +   * List of file systems for which to obtain delegation tokens. The base 
implementation
    +   * returns just the default file system in the given Hadoop 
configuration.
        */
    -  def getServiceDelegationTokenProvider(service: String): 
Option[HadoopDelegationTokenProvider] = {
    -    delegationTokenProviders.get(service)
    +  protected def fileSystemsToAccess(): Set[FileSystem] = {
    +    Set(FileSystem.get(hadoopConf))
    +  }
    +
    +  private def scheduleRenewal(delay: Long): Unit = {
    +    val _delay = math.max(0, delay)
    +    logInfo(s"Scheduling login from keytab in 
${UIUtils.formatDuration(delay)}.")
    +
    +    val renewalTask = new Runnable() {
    +      override def run(): Unit = {
    +        updateTokensTask()
    +      }
    +    }
    +    renewalExecutor.schedule(renewalTask, _delay, TimeUnit.MILLISECONDS)
       }
     
       /**
    -   * Writes delegation tokens to creds.  Delegation tokens are fetched 
from all registered
    -   * providers.
    -   *
    -   * @param hadoopConf hadoop Configuration
    -   * @param creds Credentials that will be updated in place (overwritten)
    -   * @return Time after which the fetched delegation tokens should be 
renewed.
    +   * Periodic task to login to the KDC and create new delegation tokens. 
Re-schedules itself
    +   * to fetch the next set of tokens when needed.
        */
    -  def obtainDelegationTokens(
    -      hadoopConf: Configuration,
    -      creds: Credentials): Long = {
    -    delegationTokenProviders.values.flatMap { provider =>
    -      if (provider.delegationTokensRequired(sparkConf, hadoopConf)) {
    -        provider.obtainDelegationTokens(hadoopConf, sparkConf, creds)
    +  private def updateTokensTask(): Unit = {
    +    try {
    +      val freshUGI = doLogin()
    +      val creds = obtainTokensAndScheduleRenewal(freshUGI)
    +      val tokens = SparkHadoopUtil.get.serialize(creds)
    +
    +      val driver = driverRef.get()
    +      if (driver != null) {
    +        logInfo("Updating delegation tokens.")
    +        driver.send(UpdateDelegationTokens(tokens))
           } else {
    -        logDebug(s"Service ${provider.serviceName} does not require a 
token." +
    -          s" Check your configuration to see if security is disabled or 
not.")
    -        None
    +        // This shouldn't really happen, since the driver should register 
way before tokens expire.
    +        logWarning("Delegation tokens close to expiration but no driver 
has registered yet.")
    +        SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf)
           }
    -    }.foldLeft(Long.MaxValue)(math.min)
    +    } catch {
    +      case e: Exception =>
    +        val delay = 
TimeUnit.SECONDS.toMillis(sparkConf.get(CREDENTIALS_RENEWAL_RETRY_WAIT))
    +        logWarning(s"Failed to update tokens, will try again in 
${UIUtils.formatDuration(delay)}!" +
    +          " If this happens too often tasks will fail.", e)
    +        scheduleRenewal(delay)
    +    }
    +  }
    +
    +  /**
    +   * Obtain new delegation tokens from the available providers. Schedules 
a new task to fetch
    +   * new tokens before the new set expires.
    +   *
    +   * @return Credentials containing the new tokens.
    +   */
    +  private def obtainTokensAndScheduleRenewal(ugi: UserGroupInformation): 
Credentials = {
    +    ugi.doAs(new PrivilegedExceptionAction[Credentials]() {
    +      override def run(): Credentials = {
    +        val creds = new Credentials()
    +        val nextRenewal = obtainDelegationTokens(creds)
    +
    +        // Calculate the time when new credentials should be created, 
based on the configured
    +        // ratio.
    +        val now = System.currentTimeMillis
    +        val ratio = sparkConf.get(CREDENTIALS_RENEWAL_INTERVAL_RATIO)
    +        val adjustedNextRenewal = (now + (ratio * (nextRenewal - 
now))).toLong
    +
    +        scheduleRenewal(adjustedNextRenewal - now)
    --- End diff --
    
    you're adding `now` and subtracting it off again, instead you could do
    
    ```scala
    val adjustedRenewalDelay = (ratio * (nextRenewal - now)).toLong
    scheduleRenewal(adjustedRenewalDelay)
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to