Github user jerryshao commented on a diff in the pull request:
https://github.com/apache/spark/pull/20657#discussion_r173380826
--- Diff:
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
---
@@ -18,221 +18,160 @@ package org.apache.spark.deploy.yarn.security
import java.security.PrivilegedExceptionAction
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.AtomicReference
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.security.HadoopDelegationTokenManager
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
+import org.apache.spark.rpc.RpcEndpointRef
+import
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.ui.UIUtils
import org.apache.spark.util.ThreadUtils
/**
- * The following methods are primarily meant to make sure long-running
apps like Spark
- * Streaming apps can run without interruption while accessing secured
services. The
- * scheduleLoginFromKeytab method is called on the AM to get the new
credentials.
- * This method wakes up a thread that logs into the KDC
- * once 75% of the renewal interval of the original credentials used for
the container
- * has elapsed. It then obtains new credentials and writes them to HDFS in
a
- * pre-specified location - the prefix of which is specified in the
sparkConf by
- * spark.yarn.credentials.file (so the file(s) would be named
c-timestamp1-1, c-timestamp2-2 etc.
- * - each update goes to a new file, with a monotonically increasing
suffix), also the
- * timestamp1, timestamp2 here indicates the time of next update for
CredentialUpdater.
- * After this, the credentials are renewed once 75% of the new tokens
renewal interval has elapsed.
+ * A manager tasked with periodically updating delegation tokens needed by
the application.
*
- * On the executor and driver (yarn client mode) side, the
updateCredentialsIfRequired method is
- * called once 80% of the validity of the original credentials has
elapsed. At that time the
- * executor finds the credentials file with the latest timestamp and
checks if it has read those
- * credentials before (by keeping track of the suffix of the last file it
read). If a new file has
- * appeared, it will read the credentials and update the currently running
UGI with it. This
- * process happens again once 80% of the validity of this has expired.
+ * This manager is meant to make sure long-running apps (such as Spark
Streaming apps) can run
+ * without interruption while accessing secured services. It periodically
logs in to the KDC with
+ * user-provided credentials, and contacts all the configured secure
services to obtain delegation
+ * tokens to be distributed to the rest of the application.
+ *
+ * This class will manage the kerberos login, by renewing the TGT when
needed. Because the UGI API
+ * does not expose the TTL of the TGT, a configuration controls how often
to check that a relogin is
+ * necessary. This is done reasonably often since the check is a no-op
when the relogin is not yet
+ * needed. The check period can be overridden in the configuration.
+ *
+ * New delegation tokens are created once 75% of the renewal interval of
the original tokens has
+ * elapsed. The new tokens are sent to the Spark driver endpoint once it's
registered with the AM.
+ * The driver is tasked with distributing the tokens to other processes
that might need them.
*/
private[yarn] class AMCredentialRenewer(
sparkConf: SparkConf,
- hadoopConf: Configuration,
- credentialManager: YARNHadoopDelegationTokenManager) extends Logging {
+ hadoopConf: Configuration) extends Logging {
- private var lastCredentialsFileSuffix = 0
+ private val principal = sparkConf.get(PRINCIPAL).get
+ private val keytab = sparkConf.get(KEYTAB).get
+ private val credentialManager = new
YARNHadoopDelegationTokenManager(sparkConf, hadoopConf)
- private val credentialRenewerThread: ScheduledExecutorService =
+ private val renewalExecutor: ScheduledExecutorService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh
Thread")
- private val hadoopUtil = SparkHadoopUtil.get
+ private val driverRef = new AtomicReference[RpcEndpointRef]()
- private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
- private val daysToKeepFiles =
sparkConf.get(CREDENTIALS_FILE_MAX_RETENTION)
- private val numFilesToKeep = sparkConf.get(CREDENTIAL_FILE_MAX_COUNT)
- private val freshHadoopConf =
--- End diff --
Why for now we don't need to create a new Hadoop configuration by disabling
fs cache?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]