Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4688#discussion_r25525993
  
    --- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala ---
    @@ -82,6 +93,102 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
         if (credentials != null) credentials.getSecretKey(new Text(key)) else 
null
       }
     
    +  override def setPrincipalAndKeytabForLogin(principal: String, keytab: 
String): Unit = {
    +    loginPrincipal = Option(principal)
    +    keytabFile = Option(keytab)
    +  }
    +
    +  private[spark] override def scheduleLoginFromKeytab(
    +    callback: (SerializableBuffer)  => Unit): Unit = {
    +
    +    loginPrincipal match {
    +      case Some(principal) =>
    +        val keytab = keytabFile.get
    +        val remoteFs = FileSystem.get(conf)
    +        val remoteKeytabPath = new Path(
    +          remoteFs.getHomeDirectory, System.getenv("SPARK_STAGING_DIR") + 
Path.SEPARATOR + keytab)
    +        val localFS = FileSystem.getLocal(conf)
    +        // At this point, SparkEnv is likely no initialized, so create a 
dir, put the keytab there.
    +        val tempDir = Utils.createTempDir()
    +        val localURI = new URI(tempDir.getAbsolutePath + Path.SEPARATOR + 
keytab)
    +        val qualifiedURI = new  URI(localFS.makeQualified(new 
Path(localURI)).toString)
    +        FileUtil.copy(
    +          remoteFs, remoteKeytabPath, localFS, new Path(qualifiedURI), 
false, false, conf)
    +        // Get the current credentials, find out when they expire.
    +        val creds = UserGroupInformation.getCurrentUser.getCredentials
    +        val credStream = new ByteArrayOutputStream()
    +        creds.writeTokenStorageToStream(new DataOutputStream(credStream))
    +        val in = new DataInputStream(new 
ByteArrayInputStream(credStream.toByteArray))
    +        val tokenIdentifier = new DelegationTokenIdentifier()
    +        tokenIdentifier.readFields(in)
    +        val timeToRenewal = (0.6 * (tokenIdentifier.getMaxDate - 
System.currentTimeMillis())).toLong
    +        Executors.newSingleThreadScheduledExecutor(new ThreadFactory {
    +          override def newThread(r: Runnable): Thread = {
    +            val t = new Thread(r)
    +            t.setName("Delegation Token Refresh Thread")
    +            t.setDaemon(true)
    +            t
    +          }
    +        }).scheduleWithFixedDelay(new Runnable {
    +          override def run(): Unit = {
    +            if (!loggedInViaKeytab.get()) {
    --- End diff --
    
    It may not be the same thread but it's always single-threaded, right? 
Otherwise the `SingleThreadedExecutor` implementation has a problem...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to