Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4688#discussion_r25915171
  
    --- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala ---
    @@ -82,6 +102,180 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
         if (credentials != null) credentials.getSecretKey(new Text(key)) else 
null
       }
     
    +  private[spark] override def scheduleLoginFromKeytab(): Unit = {
    +    val principal = System.getenv("SPARK_PRINCIPAL")
    +    val keytab = System.getenv("SPARK_KEYTAB")
    +    if (principal != null) {
    +      val delegationTokenRenewerThread =
    +        new Runnable {
    +          override def run(): Unit = {
    +            if (!loggedInViaKeytab) {
    +              // Keytab is copied by YARN to the working directory of the 
AM, so full path is
    +              // not needed.
    +              loggedInUGI = 
UserGroupInformation.loginUserFromKeytabAndReturnUGI(
    +                principal, keytab)
    +              loggedInViaKeytab = true
    +            }
    +            val nns = getNameNodesToAccess(sparkConf)
    +            val newCredentials = loggedInUGI.getCredentials
    +            obtainTokensForNamenodes(nns, conf, newCredentials)
    +            val remoteFs = FileSystem.get(conf)
    +            val stagingDirPath =
    +              new Path(remoteFs.getHomeDirectory, 
System.getenv("SPARK_YARN_STAGING_DIR"))
    +            val tokenPathStr = sparkConf.get("spark.yarn.credentials.file")
    +            val tokenPath = new Path(stagingDirPath.toString, tokenPathStr)
    +            val tempTokenPath = new Path(stagingDirPath.toString, 
tokenPathStr + ".tmp")
    +            val stream = remoteFs.create(tempTokenPath, true)
    +            // Now write this out to HDFS
    +            newCredentials.writeTokenStorageToStream(stream)
    +            stream.hflush()
    +            stream.close()
    +            // HDFS does reads by inodes now, so just doing a rename 
should be fine. But I could
    +            // not find a clear explanation of when the blocks on HDFS are 
deleted. Ideally, we
    +            // would not need this, but just be defensive to ensure we 
don't mess up the
    +            // credentials. So create a file to show that we are currently 
updating - if the
    +            // reader sees this file, they go away and come back later. 
Then delete old token and
    +            // rename the old to new.
    +            val updatingPath = new Path(stagingDirPath, "_UPDATING")
    +            if (remoteFs.exists(updatingPath)) {
    +              remoteFs.delete(updatingPath, true)
    +            }
    +            remoteFs.create(updatingPath).close()
    +            if (remoteFs.exists(tokenPath)) {
    +              remoteFs.delete(tokenPath, true)
    +            }
    +            remoteFs.rename(tempTokenPath, tokenPath)
    +            remoteFs.delete(updatingPath, true)
    +            delegationTokenRenewer.schedule(
    +              this, (0.75 * (getLatestValidity - 
System.currentTimeMillis())).toLong,
    +              TimeUnit.MILLISECONDS)
    +          }
    +        }
    +      val timeToRenewal = (0.75 * (getLatestValidity - 
System.currentTimeMillis())).toLong
    +      delegationTokenRenewer.schedule(
    +        delegationTokenRenewerThread, timeToRenewal, TimeUnit.MILLISECONDS)
    +    }
    +  }
    +
    +  override def updateCredentialsIfRequired(): Unit = {
    +    try {
    +      val credentialsFile = sparkConf.get("spark.yarn.credentials.file")
    +      if (credentialsFile != null && !credentialsFile.isEmpty) {
    +        val remoteFs = FileSystem.get(conf)
    +        val sparkStagingDir = System.getenv("SPARK_YARN_STAGING_DIR")
    +        val stagingDirPath = new Path(remoteFs.getHomeDirectory, 
sparkStagingDir)
    +        val credentialsFilePath = new Path(stagingDirPath, credentialsFile)
    +        // If an update is currently in progress, come back later!
    +        if (remoteFs.exists( new Path(stagingDirPath, "_UPDATING"))) {
    +          
delegationTokenRenewer.schedule(delegationTokenExecuterUpdaterThread, 1, 
TimeUnit.HOURS)
    +        }
    +        // Now check if the file exists, if it does go get the credentials 
from there
    +        if (remoteFs.exists(credentialsFilePath)) {
    +          val status = remoteFs.getFileStatus(credentialsFilePath)
    +          val modTimeAtStart = status.getModificationTime
    +          if (modTimeAtStart > lastCredentialsRefresh) {
    +            val newCredentials = getCredentialsFromHDFSFile(remoteFs, 
credentialsFilePath)
    +            val newStatus = remoteFs.getFileStatus(credentialsFilePath)
    +            // File was updated after we started reading it, lets come 
back later and try to read
    +            // it.
    +            if (newStatus.getModificationTime != modTimeAtStart) {
    +              delegationTokenRenewer
    +                .schedule(delegationTokenExecuterUpdaterThread, 1, 
TimeUnit.HOURS)
    +            } else {
    +              
UserGroupInformation.getCurrentUser.addCredentials(newCredentials)
    +              lastCredentialsRefresh = status.getModificationTime
    +              val totalValidity = getLatestValidity - 
lastCredentialsRefresh
    +              val timeToRunRenewal = lastCredentialsRefresh + (0.8 * 
totalValidity).toLong
    +              val timeFromNowToRenewal = timeToRunRenewal - 
System.currentTimeMillis()
    +              
delegationTokenRenewer.schedule(delegationTokenExecuterUpdaterThread,
    +                timeFromNowToRenewal, TimeUnit.MILLISECONDS)
    +            }
    +          } else {
    +            // Check every hour to see if new credentials arrived.
    +            
delegationTokenRenewer.schedule(delegationTokenExecuterUpdaterThread, 1, 
TimeUnit.HOURS)
    +          }
    +        }
    +      }
    +    } catch {
    +      // Since the file may get deleted while we are reading it, catch the 
Exception and come
    +      // back in an hour to try again
    +      case e: Exception =>
    +        logWarning(
    +          "Error encountered while trying to update credentials, will try 
again in 1 hour", e)
    +        
delegationTokenRenewer.schedule(delegationTokenExecuterUpdaterThread, 1, 
TimeUnit.HOURS)
    +    }
    +  }
    +
    +  private[spark] def getCredentialsFromHDFSFile(
    +    remoteFs: FileSystem,
    +    tokenPath: Path
    +  ): Credentials = {
    --- End diff --
    
    nit: I think the style is for these to go on the previous line (and for the 
previous lines to be double-indented). That is, if all this doesn't fit in a 
single line...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to