Github user harishreedharan commented on a diff in the pull request: https://github.com/apache/spark/pull/4688#discussion_r25039668 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala --- @@ -121,6 +132,100 @@ class SparkHadoopUtil extends Logging { UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename) } + def setPrincipalAndKeytabForLogin(principal: String, keytab: String): Unit ={ + loginPrincipal = Option(principal) + keytabFile = Option(keytab) + } + + private[spark] def scheduleLoginFromKeytab(callback: (SerializableBuffer) => Unit): Unit = { + + loginPrincipal match { + case Some(principal) => + val keytab = keytabFile.get + val remoteFs = FileSystem.get(conf) + val remoteKeytabPath = new Path( + remoteFs.getHomeDirectory, System.getenv("SPARK_STAGING_DIR") + Path.SEPARATOR + keytab) + val localFS = FileSystem.getLocal(conf) + // At this point, SparkEnv is likely no initialized, so create a dir, put the keytab there. + val tempDir = Utils.createTempDir() + val localURI = new URI(tempDir.getAbsolutePath + Path.SEPARATOR + keytab) + val qualifiedURI = new URI(localFS.makeQualified(new Path(localURI)).toString) + FileUtil.copy( + remoteFs, remoteKeytabPath, localFS, new Path(qualifiedURI), false, false, conf) + // Get the current credentials, find out when they expire. + val creds = UserGroupInformation.getCurrentUser.getCredentials + val credStream = new ByteArrayOutputStream() + creds.writeTokenStorageToStream(new DataOutputStream(credStream)) + val in = new DataInputStream(new ByteArrayInputStream(credStream.toByteArray)) + val tokenIdentifier = new DelegationTokenIdentifier() + tokenIdentifier.readFields(in) + val timeToRenewal = (0.6 * (tokenIdentifier.getMaxDate - System.currentTimeMillis())).toLong + Executors.newSingleThreadScheduledExecutor(new ThreadFactory { + override def newThread(r: Runnable): Thread = { + val t = new Thread(r) + t.setName("Delegation Token Refresh Thread") + t.setDaemon(true) + t + } + }).scheduleWithFixedDelay(new Runnable { + override def run(): Unit = { + if (!loggedInViaKeytab.get()) { + loginUserFromKeytab(principal, tempDir.getAbsolutePath + Path.SEPARATOR + keytab) + loggedInViaKeytab.set(true) + } + val nns = getNameNodesToAccess(sparkConf) + remoteKeytabPath + val newCredentials = new Credentials() + obtainTokensForNamenodes(nns, conf, newCredentials) + // Now write this out via Akka to executors. + val outputStream = new ByteArrayOutputStream() + newCredentials.writeTokenStorageToStream(new DataOutputStream(outputStream)) + callback(new SerializableBuffer(ByteBuffer.wrap(outputStream.toByteArray))) + } + }, timeToRenewal, timeToRenewal, TimeUnit.MILLISECONDS) + case None => + } + } + + /** + * Get the list of namenodes the user may access. + */ + def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = { + sparkConf.get("spark.yarn.access.namenodes", "") + .split(",") + .map(_.trim()) + .filter(!_.isEmpty) + .map(new Path(_)) + .toSet + } + + def getTokenRenewer(conf: Configuration): String = { + val delegTokenRenewer = Master.getMasterPrincipal(conf) --- End diff -- We can probably move it out to a I told class in the YARN module, and we should be ok, since this code is used only by the AM.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org