Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4688#discussion_r25038157
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
---
    @@ -121,6 +132,100 @@ class SparkHadoopUtil extends Logging {
         UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename)
       }
     
    +  def setPrincipalAndKeytabForLogin(principal: String, keytab: String): 
Unit ={
    +    loginPrincipal = Option(principal)
    +    keytabFile = Option(keytab)
    +  }
    +
    +  private[spark] def scheduleLoginFromKeytab(callback: 
(SerializableBuffer)  => Unit): Unit = {
    +
    +    loginPrincipal match {
    +      case Some(principal) =>
    +        val keytab = keytabFile.get
    +        val remoteFs = FileSystem.get(conf)
    +        val remoteKeytabPath = new Path(
    +          remoteFs.getHomeDirectory, System.getenv("SPARK_STAGING_DIR") + 
Path.SEPARATOR + keytab)
    +        val localFS = FileSystem.getLocal(conf)
    +        // At this point, SparkEnv is likely no initialized, so create a 
dir, put the keytab there.
    +        val tempDir = Utils.createTempDir()
    +        val localURI = new URI(tempDir.getAbsolutePath + Path.SEPARATOR + 
keytab)
    +        val qualifiedURI = new  URI(localFS.makeQualified(new 
Path(localURI)).toString)
    +        FileUtil.copy(
    +          remoteFs, remoteKeytabPath, localFS, new Path(qualifiedURI), 
false, false, conf)
    +        // Get the current credentials, find out when they expire.
    +        val creds = UserGroupInformation.getCurrentUser.getCredentials
    +        val credStream = new ByteArrayOutputStream()
    +        creds.writeTokenStorageToStream(new DataOutputStream(credStream))
    +        val in = new DataInputStream(new 
ByteArrayInputStream(credStream.toByteArray))
    +        val tokenIdentifier = new DelegationTokenIdentifier()
    +        tokenIdentifier.readFields(in)
    +        val timeToRenewal = (0.6 * (tokenIdentifier.getMaxDate - 
System.currentTimeMillis())).toLong
    +        Executors.newSingleThreadScheduledExecutor(new ThreadFactory {
    +          override def newThread(r: Runnable): Thread = {
    +            val t = new Thread(r)
    +            t.setName("Delegation Token Refresh Thread")
    +            t.setDaemon(true)
    +            t
    +          }
    +        }).scheduleWithFixedDelay(new Runnable {
    +          override def run(): Unit = {
    +            if (!loggedInViaKeytab.get()) {
    +              loginUserFromKeytab(principal, tempDir.getAbsolutePath + 
Path.SEPARATOR + keytab)
    +              loggedInViaKeytab.set(true)
    +            }
    +            val nns = getNameNodesToAccess(sparkConf) + remoteKeytabPath
    +            val newCredentials = new Credentials()
    +            obtainTokensForNamenodes(nns, conf, newCredentials)
    +            // Now write this out via Akka to executors.
    +            val outputStream = new ByteArrayOutputStream()
    +            newCredentials.writeTokenStorageToStream(new 
DataOutputStream(outputStream))
    +            callback(new 
SerializableBuffer(ByteBuffer.wrap(outputStream.toByteArray)))
    +          }
    +        }, timeToRenewal, timeToRenewal, TimeUnit.MILLISECONDS)
    +      case None =>
    +    }
    +  }
    +
    +  /**
    +   * Get the list of namenodes the user may access.
    +   */
    +  def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = {
    +    sparkConf.get("spark.yarn.access.namenodes", "")
    +      .split(",")
    +      .map(_.trim())
    +      .filter(!_.isEmpty)
    +      .map(new Path(_))
    +      .toSet
    +  }
    +
    +  def getTokenRenewer(conf: Configuration): String = {
    +    val delegTokenRenewer = Master.getMasterPrincipal(conf)
    --- End diff --
    
    It does not seem like this is why the build is failing. It failed MiMA 
checks for some reason. I will look at it in some time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to