Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/4688#discussion_r25037138 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala --- @@ -121,6 +132,100 @@ class SparkHadoopUtil extends Logging { UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename) } + def setPrincipalAndKeytabForLogin(principal: String, keytab: String): Unit ={ + loginPrincipal = Option(principal) + keytabFile = Option(keytab) + } + + private[spark] def scheduleLoginFromKeytab(callback: (SerializableBuffer) => Unit): Unit = { + + loginPrincipal match { + case Some(principal) => + val keytab = keytabFile.get + val remoteFs = FileSystem.get(conf) + val remoteKeytabPath = new Path( + remoteFs.getHomeDirectory, System.getenv("SPARK_STAGING_DIR") + Path.SEPARATOR + keytab) + val localFS = FileSystem.getLocal(conf) + // At this point, SparkEnv is likely no initialized, so create a dir, put the keytab there. + val tempDir = Utils.createTempDir() + val localURI = new URI(tempDir.getAbsolutePath + Path.SEPARATOR + keytab) + val qualifiedURI = new URI(localFS.makeQualified(new Path(localURI)).toString) + FileUtil.copy( + remoteFs, remoteKeytabPath, localFS, new Path(qualifiedURI), false, false, conf) + // Get the current credentials, find out when they expire. + val creds = UserGroupInformation.getCurrentUser.getCredentials + val credStream = new ByteArrayOutputStream() + creds.writeTokenStorageToStream(new DataOutputStream(credStream)) + val in = new DataInputStream(new ByteArrayInputStream(credStream.toByteArray)) + val tokenIdentifier = new DelegationTokenIdentifier() + tokenIdentifier.readFields(in) + val timeToRenewal = (0.6 * (tokenIdentifier.getMaxDate - System.currentTimeMillis())).toLong + Executors.newSingleThreadScheduledExecutor(new ThreadFactory { + override def newThread(r: Runnable): Thread = { + val t = new Thread(r) + t.setName("Delegation Token Refresh Thread") + t.setDaemon(true) + t + } + }).scheduleWithFixedDelay(new Runnable { + override def run(): Unit = { + if (!loggedInViaKeytab.get()) { + loginUserFromKeytab(principal, tempDir.getAbsolutePath + Path.SEPARATOR + keytab) + loggedInViaKeytab.set(true) + } + val nns = getNameNodesToAccess(sparkConf) + remoteKeytabPath + val newCredentials = new Credentials() + obtainTokensForNamenodes(nns, conf, newCredentials) + // Now write this out via Akka to executors. + val outputStream = new ByteArrayOutputStream() + newCredentials.writeTokenStorageToStream(new DataOutputStream(outputStream)) + callback(new SerializableBuffer(ByteBuffer.wrap(outputStream.toByteArray))) + } + }, timeToRenewal, timeToRenewal, TimeUnit.MILLISECONDS) + case None => + } + } + + /** + * Get the list of namenodes the user may access. + */ + def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = { + sparkConf.get("spark.yarn.access.namenodes", "") + .split(",") + .map(_.trim()) + .filter(!_.isEmpty) + .map(new Path(_)) + .toSet + } + + def getTokenRenewer(conf: Configuration): String = { + val delegTokenRenewer = Master.getMasterPrincipal(conf) --- End diff -- Hey @harishreedharan , Is this avoidable? This is a `@Private` API and it doesn't seem to exist on hadoop 1.x (which is probably the cause of the build failures).
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org