asfgit closed pull request #23338: [SPARK-25689][yarn] Make driver, not AM,
manage delegation tokens.
URL: https://github.com/apache/spark/pull/23338
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
index f7e3ddecee093..d97857a39fc21 100644
---
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
+++
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
@@ -21,7 +21,6 @@ import java.io.File
import java.net.URI
import java.security.PrivilegedExceptionAction
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
-import java.util.concurrent.atomic.AtomicReference
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
@@ -39,32 +38,24 @@ import org.apache.spark.util.ThreadUtils
/**
* Manager for delegation tokens in a Spark application.
*
- * This manager has two modes of operation:
- *
- * 1. When configured with a principal and a keytab, it will make sure
long-running apps can run
- * without interruption while accessing secured services. It periodically logs
in to the KDC with
- * user-provided credentials, and contacts all the configured secure services
to obtain delegation
- * tokens to be distributed to the rest of the application.
- *
- * Because the Hadoop UGI API does not expose the TTL of the TGT, a
configuration controls how often
- * to check that a relogin is necessary. This is done reasonably often since
the check is a no-op
- * when the relogin is not yet needed. The check period can be overridden in
the configuration.
+ * When configured with a principal and a keytab, this manager will make sure
long-running apps can
+ * run without interruption while accessing secured services. It periodically
logs in to the KDC
+ * with user-provided credentials, and contacts all the configured secure
services to obtain
+ * delegation tokens to be distributed to the rest of the application.
*
* New delegation tokens are created once 75% of the renewal interval of the
original tokens has
- * elapsed. The new tokens are sent to the Spark driver endpoint once it's
registered with the AM.
- * The driver is tasked with distributing the tokens to other processes that
might need them.
+ * elapsed. The new tokens are sent to the Spark driver endpoint. The driver
is tasked with
+ * distributing the tokens to other processes that might need them.
*
- * 2. When operating without an explicit principal and keytab, token renewal
will not be available.
- * Starting the manager will distribute an initial set of delegation tokens to
the provided Spark
- * driver, but the app will not get new tokens when those expire.
- *
- * It can also be used just to create delegation tokens, by calling the
`obtainDelegationTokens`
- * method. This option does not require calling the `start` method, but leaves
it up to the
- * caller to distribute the tokens that were generated.
+ * This class can also be used just to create delegation tokens, by calling the
+ * `obtainDelegationTokens` method. This option does not require calling the
`start` method nor
+ * providing a driver reference, but leaves it up to the caller to distribute
the tokens that were
+ * generated.
*/
private[spark] class HadoopDelegationTokenManager(
protected val sparkConf: SparkConf,
- protected val hadoopConf: Configuration) extends Logging {
+ protected val hadoopConf: Configuration,
+ protected val schedulerRef: RpcEndpointRef) extends Logging {
private val deprecatedProviderEnabledConfigs = List(
"spark.yarn.security.tokens.%s.enabled",
@@ -85,60 +76,44 @@ private[spark] class HadoopDelegationTokenManager(
s"${delegationTokenProviders.keys.mkString(", ")}.")
private var renewalExecutor: ScheduledExecutorService = _
- private val driverRef = new AtomicReference[RpcEndpointRef]()
-
- /** Set the endpoint used to send tokens to the driver. */
- def setDriverRef(ref: RpcEndpointRef): Unit = {
- driverRef.set(ref)
- }
/** @return Whether delegation token renewal is enabled. */
def renewalEnabled: Boolean = principal != null
/**
- * Start the token renewer. Requires a principal and keytab. Upon start, the
renewer will:
+ * Start the token renewer. Requires a principal and keytab. Upon start, the
renewer will
+ * obtain delegation tokens for all configured services and send them to the
driver, and
+ * set up tasks to periodically get fresh tokens as needed.
*
- * - log in the configured principal, and set up a task to keep that user's
ticket renewed
- * - obtain delegation tokens from all available providers
- * - send the tokens to the driver, if it's already registered
- * - schedule a periodic task to update the tokens when needed.
+ * This method requires that a keytab has been provided to Spark, and will
try to keep the
+ * logged in user's TGT valid while this manager is active.
*
- * @return The newly logged in user.
+ * @return New set of delegation tokens created for the configured principal.
*/
- def start(): UserGroupInformation = {
+ def start(): Array[Byte] = {
require(renewalEnabled, "Token renewal must be enabled to start the
renewer.")
+ require(schedulerRef != null, "Token renewal requires a scheduler
endpoint.")
renewalExecutor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal
Thread")
- val originalCreds = UserGroupInformation.getCurrentUser().getCredentials()
- val ugi = doLogin()
-
- val tgtRenewalTask = new Runnable() {
- override def run(): Unit = {
- ugi.checkTGTAndReloginFromKeytab()
+ val ugi = UserGroupInformation.getCurrentUser()
+ if (ugi.isFromKeytab()) {
+ // In Hadoop 2.x, renewal of the keytab-based login seems to be
automatic, but in Hadoop 3.x,
+ // it is configurable (see
hadoop.kerberos.keytab.login.autorenewal.enabled, added in
+ // HADOOP-9567). This task will make sure that the user stays logged in
regardless of that
+ // configuration's value. Note that checkTGTAndReloginFromKeytab() is a
no-op if the TGT does
+ // not need to be renewed yet.
+ val tgtRenewalTask = new Runnable() {
+ override def run(): Unit = {
+ ugi.checkTGTAndReloginFromKeytab()
+ }
}
+ val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD)
+ renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod,
tgtRenewalPeriod,
+ TimeUnit.SECONDS)
}
- val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD)
- renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod,
tgtRenewalPeriod,
- TimeUnit.SECONDS)
- val creds = obtainTokensAndScheduleRenewal(ugi)
- ugi.addCredentials(creds)
-
- val driver = driverRef.get()
- if (driver != null) {
- val tokens = SparkHadoopUtil.get.serialize(creds)
- driver.send(UpdateDelegationTokens(tokens))
- }
-
- // Transfer the original user's tokens to the new user, since it may
contain needed tokens
- // (such as those user to connect to YARN). Explicitly avoid overwriting
tokens that already
- // exist in the current user's credentials, since those were freshly
obtained above
- // (see SPARK-23361).
- val existing = ugi.getCredentials()
- existing.mergeAll(originalCreds)
- ugi.addCredentials(existing)
- ugi
+ updateTokensTask()
}
def stop(): Unit = {
@@ -218,27 +193,22 @@ private[spark] class HadoopDelegationTokenManager(
* Periodic task to login to the KDC and create new delegation tokens.
Re-schedules itself
* to fetch the next set of tokens when needed.
*/
- private def updateTokensTask(): Unit = {
+ private def updateTokensTask(): Array[Byte] = {
try {
val freshUGI = doLogin()
val creds = obtainTokensAndScheduleRenewal(freshUGI)
val tokens = SparkHadoopUtil.get.serialize(creds)
- val driver = driverRef.get()
- if (driver != null) {
- logInfo("Updating delegation tokens.")
- driver.send(UpdateDelegationTokens(tokens))
- } else {
- // This shouldn't really happen, since the driver should register way
before tokens expire.
- logWarning("Delegation tokens close to expiration but no driver has
registered yet.")
- SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf)
- }
+ logInfo("Updating delegation tokens.")
+ schedulerRef.send(UpdateDelegationTokens(tokens))
+ tokens
} catch {
case e: Exception =>
val delay =
TimeUnit.SECONDS.toMillis(sparkConf.get(CREDENTIALS_RENEWAL_RETRY_WAIT))
logWarning(s"Failed to update tokens, will try again in
${UIUtils.formatDuration(delay)}!" +
" If this happens too often tasks will fail.", e)
scheduleRenewal(delay)
+ null
}
}
diff --git
a/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala
b/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala
index 90f7051381571..4ca0136424fe1 100644
---
a/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala
+++
b/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala
@@ -67,11 +67,17 @@ private[spark] class HiveDelegationTokenProvider
// Other modes (such as client with or without keytab, or cluster mode
with keytab) do not need
// a delegation token, since there's a valid kerberos TGT for the right
user available to the
// driver, which is the only process that connects to the HMS.
- val deployMode = sparkConf.get("spark.submit.deployMode", "client")
- UserGroupInformation.isSecurityEnabled &&
+ //
+ // Note that this means Hive tokens are not re-created periodically by the
token manager.
+ // This is because HMS connections are only performed by the Spark driver,
and the driver
+ // either has a TGT, in which case it does not need tokens, or it has a
token created
+ // elsewhere, in which case it cannot create new ones. The check for an
existing token avoids
+ // printing an exception to the logs in the latter case.
+ val currentToken =
UserGroupInformation.getCurrentUser().getCredentials().getToken(tokenAlias)
+ currentToken == null && UserGroupInformation.isSecurityEnabled &&
hiveConf(hadoopConf).getTrimmed("hive.metastore.uris", "").nonEmpty &&
(SparkHadoopUtil.get.isProxyUser(UserGroupInformation.getCurrentUser())
||
- (deployMode == "cluster" && !sparkConf.contains(KEYTAB)))
+ (!Utils.isClientMode(sparkConf) && !sparkConf.contains(KEYTAB)))
}
override def obtainDelegationTokens(
@@ -98,7 +104,7 @@ private[spark] class HiveDelegationTokenProvider
val hive2Token = new Token[DelegationTokenIdentifier]()
hive2Token.decodeFromUrlString(tokenStr)
logDebug(s"Get Token from hive metastore: ${hive2Token.toString}")
- creds.addToken(new Text("hive.server2.delegation.token"), hive2Token)
+ creds.addToken(tokenAlias, hive2Token)
}
None
@@ -134,4 +140,6 @@ private[spark] class HiveDelegationTokenProvider
case e: UndeclaredThrowableException => throw
Option(e.getCause()).getOrElse(e)
}
}
+
+ private def tokenAlias: Text = new Text("hive.server2.delegation.token")
}
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index e8b7fc0ef100a..9e768c22c17e3 100644
---
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -104,6 +104,9 @@ private[spark] object CoarseGrainedClusterMessages {
case class RegisterClusterManager(am: RpcEndpointRef) extends
CoarseGrainedClusterMessage
+ // Used by YARN's client mode AM to retrieve the current set of delegation
tokens.
+ object RetrieveDelegationTokens extends CoarseGrainedClusterMessage
+
// Request executors by specifying the new total number of executors desired
// This includes executors already pending or running
case class RequestExecutors(
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 329158a44d369..98ed2fffc0ac5 100644
---
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -162,11 +162,7 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
}
case UpdateDelegationTokens(newDelegationTokens) =>
- SparkHadoopUtil.get.addDelegationTokens(newDelegationTokens, conf)
- delegationTokens.set(newDelegationTokens)
- executorDataMap.values.foreach { ed =>
- ed.executorEndpoint.send(UpdateDelegationTokens(newDelegationTokens))
- }
+ updateDelegationTokens(newDelegationTokens)
case RemoveExecutor(executorId, reason) =>
// We will remove the executor's state and cannot restore it. However,
the connection
@@ -404,17 +400,18 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
driverEndpoint = createDriverEndpointRef(properties)
if (UserGroupInformation.isSecurityEnabled()) {
- delegationTokenManager = createTokenManager()
+ delegationTokenManager = createTokenManager(driverEndpoint)
delegationTokenManager.foreach { dtm =>
- dtm.setDriverRef(driverEndpoint)
- val creds = if (dtm.renewalEnabled) {
- dtm.start().getCredentials()
+ val tokens = if (dtm.renewalEnabled) {
+ dtm.start()
} else {
val creds = UserGroupInformation.getCurrentUser().getCredentials()
dtm.obtainDelegationTokens(creds)
- creds
+ SparkHadoopUtil.get.serialize(creds)
+ }
+ if (tokens != null) {
+ delegationTokens.set(tokens)
}
- delegationTokens.set(SparkHadoopUtil.get.serialize(creds))
}
}
}
@@ -716,8 +713,27 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
* Create the delegation token manager to be used for the application. This
method is called
* once during the start of the scheduler backend (so after the object has
already been
* fully constructed), only if security is enabled in the Hadoop
configuration.
+ *
+ * @param schedulerRef RPC endpoint for the scheduler, where updated
delegation tokens should be
+ * sent.
*/
- protected def createTokenManager(): Option[HadoopDelegationTokenManager] =
None
+ protected def createTokenManager(
+ schedulerRef: RpcEndpointRef): Option[HadoopDelegationTokenManager] =
None
+
+ /**
+ * Called when a new set of delegation tokens is sent to the driver. Child
classes can override
+ * this method but should always call this implementation, which handles
token distribution to
+ * executors.
+ */
+ protected def updateDelegationTokens(tokens: Array[Byte]): Unit = {
+ SparkHadoopUtil.get.addDelegationTokens(tokens, conf)
+ delegationTokens.set(tokens)
+ executorDataMap.values.foreach { ed =>
+ ed.executorEndpoint.send(UpdateDelegationTokens(tokens))
+ }
+ }
+
+ protected def currentDelegationTokens: Array[Byte] = delegationTokens.get()
}
diff --git
a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
index def9e626a2df2..af7d44b160fef 100644
---
a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
+++
b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
@@ -27,7 +27,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite
{
private val hadoopConf = new Configuration()
test("default configuration") {
- val manager = new HadoopDelegationTokenManager(new SparkConf(false),
hadoopConf)
+ val manager = new HadoopDelegationTokenManager(new SparkConf(false),
hadoopConf, null)
assert(manager.isProviderLoaded("hadoopfs"))
assert(manager.isProviderLoaded("hbase"))
assert(manager.isProviderLoaded("hive"))
@@ -36,7 +36,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite
{
test("disable hive credential provider") {
val sparkConf = new
SparkConf(false).set("spark.security.credentials.hive.enabled", "false")
- val manager = new HadoopDelegationTokenManager(sparkConf, hadoopConf)
+ val manager = new HadoopDelegationTokenManager(sparkConf, hadoopConf, null)
assert(manager.isProviderLoaded("hadoopfs"))
assert(manager.isProviderLoaded("hbase"))
assert(!manager.isProviderLoaded("hive"))
@@ -47,7 +47,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite
{
val sparkConf = new SparkConf(false)
.set("spark.yarn.security.tokens.hadoopfs.enabled", "false")
.set("spark.yarn.security.credentials.hive.enabled", "false")
- val manager = new HadoopDelegationTokenManager(sparkConf, hadoopConf)
+ val manager = new HadoopDelegationTokenManager(sparkConf, hadoopConf, null)
assert(!manager.isProviderLoaded("hadoopfs"))
assert(manager.isProviderLoaded("hbase"))
assert(!manager.isProviderLoaded("hive"))
@@ -99,7 +99,7 @@ private object NoHiveTest {
def runTest(): Unit = {
try {
- val manager = new HadoopDelegationTokenManager(new SparkConf(), new
Configuration())
+ val manager = new HadoopDelegationTokenManager(new SparkConf(), new
Configuration(), null)
require(!manager.isProviderLoaded("hive"))
} catch {
case e: Throwable =>
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
index 721d7e97b21f8..a77e8d4dbcff2 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
@@ -91,7 +91,7 @@ private[spark] class
KerberosConfDriverFeatureStep(kubernetesConf: KubernetesDri
private lazy val delegationTokens: Array[Byte] = {
if (keytab.isEmpty && existingSecretName.isEmpty) {
val tokenManager = new
HadoopDelegationTokenManager(kubernetesConf.sparkConf,
- SparkHadoopUtil.get.newConfiguration(kubernetesConf.sparkConf))
+ SparkHadoopUtil.get.newConfiguration(kubernetesConf.sparkConf), null)
val creds = UserGroupInformation.getCurrentUser().getCredentials()
tokenManager.obtainDelegationTokens(creds)
// If no tokens and no secrets are stored in the credentials, make sure
nothing is returned,
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index cd298971e02a7..e285e202a1488 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -26,7 +26,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
-import org.apache.spark.rpc.{RpcAddress, RpcEnv}
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.{ExecutorLossReason, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend,
SchedulerBackendUtils}
import org.apache.spark.util.{ThreadUtils, Utils}
@@ -147,8 +147,9 @@ private[spark] class KubernetesClusterSchedulerBackend(
new KubernetesDriverEndpoint(sc.env.rpcEnv, properties)
}
- override protected def createTokenManager():
Option[HadoopDelegationTokenManager] = {
- Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration))
+ override protected def createTokenManager(
+ schedulerRef: RpcEndpointRef): Option[HadoopDelegationTokenManager] = {
+ Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration,
schedulerRef))
}
private class KubernetesDriverEndpoint(rpcEnv: RpcEnv, sparkProperties:
Seq[(String, String)])
diff --git
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index f5866651dc90b..d3e9d15d38023 100644
---
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -37,7 +37,7 @@ import
org.apache.spark.internal.config.EXECUTOR_HEARTBEAT_INTERVAL
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
-import org.apache.spark.rpc.RpcEndpointAddress
+import org.apache.spark.rpc.{RpcEndpointAddress, RpcEndpointRef}
import org.apache.spark.scheduler.{SlaveLost, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
@@ -774,8 +774,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
}
}
- override protected def createTokenManager():
Option[HadoopDelegationTokenManager] = {
- Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration))
+ override protected def createTokenManager(
+ schedulerRef: RpcEndpointRef): Option[HadoopDelegationTokenManager] = {
+ Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration,
schedulerRef))
}
private def numExecutors(): Int = {
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index e46c4f970c4a3..fcc9679399822 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -30,6 +30,7 @@ import scala.util.control.NonFatal
import org.apache.commons.lang3.{StringUtils => ComStrUtils}
import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.util.StringUtils
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.records._
@@ -41,7 +42,6 @@ import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.deploy.yarn.security.YARNHadoopDelegationTokenManager
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.metrics.MetricsSystem
@@ -58,6 +58,7 @@ private[spark] class ApplicationMaster(args:
ApplicationMasterArguments) extends
// TODO: Currently, task to container is computed once (TaskSetManager) -
which need not be
// optimal as more containers are available. Might need to handle this
better.
+ private val appAttemptId =
YarnSparkHadoopUtil.getContainerId.getApplicationAttemptId()
private val isClusterMode = args.userClass != null
private val sparkConf = new SparkConf()
@@ -99,25 +100,7 @@ private[spark] class ApplicationMaster(args:
ApplicationMasterArguments) extends
}
}
- private val tokenManager: Option[YARNHadoopDelegationTokenManager] = {
- sparkConf.get(KEYTAB).map { _ =>
- new YARNHadoopDelegationTokenManager(sparkConf, yarnConf)
- }
- }
-
- private val ugi = tokenManager match {
- case Some(tm) =>
- // Set the context class loader so that the token renewer has access to
jars distributed
- // by the user.
- Utils.withContextClassLoader(userClassLoader) {
- tm.start()
- }
-
- case _ =>
- SparkHadoopUtil.get.createSparkUser()
- }
-
- private val client = doAsUser { new YarnRMClient() }
+ private val client = new YarnRMClient()
// Default to twice the number of executors (twice the maximum number of
executors if dynamic
// allocation is enabled), with a minimum of 3.
@@ -174,11 +157,19 @@ private[spark] class ApplicationMaster(args:
ApplicationMasterArguments) extends
// In cluster mode, used to tell the AM when the user's SparkContext has
been initialized.
private val sparkContextPromise = Promise[SparkContext]()
- // Load the list of localized files set by the client. This is used when
launching executors,
- // and is loaded here so that these configs don't pollute the Web UI's
environment page in
- // cluster mode.
- private val localResources = doAsUser {
+ /**
+ * Load the list of localized files set by the client, used when launching
executors. This should
+ * be called in a context where the needed credentials to access HDFS are
available.
+ */
+ private def prepareLocalResources(): Map[String, LocalResource] = {
logInfo("Preparing Local resources")
+ val distCacheConf = new SparkConf(false)
+ if (args.distCacheConf != null) {
+ Utils.getPropertiesFromFile(args.distCacheConf).foreach { case (k, v) =>
+ distCacheConf.set(k, v)
+ }
+ }
+
val resources = HashMap[String, LocalResource]()
def setupDistributedCache(
@@ -199,11 +190,11 @@ private[spark] class ApplicationMaster(args:
ApplicationMasterArguments) extends
resources(fileName) = amJarRsrc
}
- val distFiles = sparkConf.get(CACHED_FILES)
- val fileSizes = sparkConf.get(CACHED_FILES_SIZES)
- val timeStamps = sparkConf.get(CACHED_FILES_TIMESTAMPS)
- val visibilities = sparkConf.get(CACHED_FILES_VISIBILITIES)
- val resTypes = sparkConf.get(CACHED_FILES_TYPES)
+ val distFiles = distCacheConf.get(CACHED_FILES)
+ val fileSizes = distCacheConf.get(CACHED_FILES_SIZES)
+ val timeStamps = distCacheConf.get(CACHED_FILES_TIMESTAMPS)
+ val visibilities = distCacheConf.get(CACHED_FILES_VISIBILITIES)
+ val resTypes = distCacheConf.get(CACHED_FILES_TYPES)
for (i <- 0 to distFiles.size - 1) {
val resType = LocalResourceType.valueOf(resTypes(i))
@@ -212,7 +203,7 @@ private[spark] class ApplicationMaster(args:
ApplicationMasterArguments) extends
}
// Distribute the conf archive to executors.
- sparkConf.get(CACHED_CONF_ARCHIVE).foreach { path =>
+ distCacheConf.get(CACHED_CONF_ARCHIVE).foreach { path =>
val uri = new URI(path)
val fs = FileSystem.get(uri, yarnConf)
val status = fs.getFileStatus(new Path(uri))
@@ -225,33 +216,12 @@ private[spark] class ApplicationMaster(args:
ApplicationMasterArguments) extends
LocalResourceVisibility.PRIVATE.name())
}
- // Clean up the configuration so it doesn't show up in the Web UI (since
it's really noisy).
- CACHE_CONFIGS.foreach { e =>
- sparkConf.remove(e)
- sys.props.remove(e.key)
- }
-
resources.toMap
}
- def getAttemptId(): ApplicationAttemptId = {
- client.getAttemptId()
- }
-
final def run(): Int = {
- doAsUser {
- runImpl()
- }
- exitCode
- }
-
- private def runImpl(): Unit = {
try {
- val appAttemptId = client.getAttemptId()
-
- var attemptID: Option[String] = None
-
- if (isClusterMode) {
+ val attemptID = if (isClusterMode) {
// Set the web ui port to be ephemeral for yarn so we don't conflict
with
// other spark processes running on the same box
System.setProperty("spark.ui.port", "0")
@@ -264,7 +234,9 @@ private[spark] class ApplicationMaster(args:
ApplicationMasterArguments) extends
// configuration will be checked in SparkContext to avoid misuse of
yarn cluster mode.
System.setProperty("spark.yarn.app.id",
appAttemptId.getApplicationId().toString())
- attemptID = Option(appAttemptId.getAttemptId.toString)
+ Option(appAttemptId.getAttemptId.toString)
+ } else {
+ None
}
new CallerContext(
@@ -277,7 +249,7 @@ private[spark] class ApplicationMaster(args:
ApplicationMasterArguments) extends
val priority = ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY - 1
ShutdownHookManager.addShutdownHook(priority) { () =>
val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf)
- val isLastAttempt = client.getAttemptId().getAttemptId() >=
maxAppAttempts
+ val isLastAttempt = appAttemptId.getAttemptId() >= maxAppAttempts
if (!finished) {
// The default state of ApplicationMaster is failed if it is invoked
by shut down hook.
@@ -322,6 +294,8 @@ private[spark] class ApplicationMaster(args:
ApplicationMasterArguments) extends
logWarning("Exception during stopping of the metric system: ", e)
}
}
+
+ exitCode
}
/**
@@ -377,9 +351,6 @@ private[spark] class ApplicationMaster(args:
ApplicationMasterArguments) extends
logDebug("shutting down user thread")
userClassThread.interrupt()
}
- if (!inShutdown) {
- tokenManager.foreach(_.stop())
- }
}
}
}
@@ -405,8 +376,8 @@ private[spark] class ApplicationMaster(args:
ApplicationMasterArguments) extends
port: Int,
_sparkConf: SparkConf,
uiAddress: Option[String]): Unit = {
- val appId = client.getAttemptId().getApplicationId().toString()
- val attemptId = client.getAttemptId().getAttemptId().toString()
+ val appId = appAttemptId.getApplicationId().toString()
+ val attemptId = appAttemptId.getAttemptId().toString()
val historyAddress = ApplicationMaster
.getHistoryServerAddress(_sparkConf, yarnConf, appId, attemptId)
@@ -415,9 +386,20 @@ private[spark] class ApplicationMaster(args:
ApplicationMasterArguments) extends
}
private def createAllocator(driverRef: RpcEndpointRef, _sparkConf:
SparkConf): Unit = {
- val appId = client.getAttemptId().getApplicationId().toString()
+ // In client mode, the AM may be restarting after delegation tokens have
reached their TTL. So
+ // always contact the driver to get the current set of valid tokens, so
that local resources can
+ // be initialized below.
+ if (!isClusterMode) {
+ val tokens = driverRef.askSync[Array[Byte]](RetrieveDelegationTokens)
+ if (tokens != null) {
+ SparkHadoopUtil.get.addDelegationTokens(tokens, _sparkConf)
+ }
+ }
+
+ val appId = appAttemptId.getApplicationId().toString()
val driverUrl = RpcEndpointAddress(driverRef.address.host,
driverRef.address.port,
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+ val localResources = prepareLocalResources()
// Before we initialize the allocator, let's log the information about how
executors will
// be run up front, to avoid printing this out for every single executor
being launched.
@@ -433,13 +415,12 @@ private[spark] class ApplicationMaster(args:
ApplicationMasterArguments) extends
allocator = client.createAllocator(
yarnConf,
_sparkConf,
+ appAttemptId,
driverUrl,
driverRef,
securityMgr,
localResources)
- tokenManager.foreach(_.setDriverRef(driverRef))
-
// Initialize the AM endpoint *after* the allocator has been initialized.
This ensures
// that when the driver sends an initial executor request (e.g. after an
AM restart),
// the allocator is ready to service requests.
@@ -755,6 +736,9 @@ private[spark] class ApplicationMaster(args:
ApplicationMasterArguments) extends
case None =>
logWarning("Container allocator is not ready to find executor loss
reasons yet.")
}
+
+ case UpdateDelegationTokens(tokens) =>
+ SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf)
}
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
@@ -767,12 +751,6 @@ private[spark] class ApplicationMaster(args:
ApplicationMasterArguments) extends
}
}
- private def doAsUser[T](fn: => T): T = {
- ugi.doAs(new PrivilegedExceptionAction[T]() {
- override def run: T = fn
- })
- }
-
}
object ApplicationMaster extends Logging {
@@ -793,7 +771,24 @@ object ApplicationMaster extends Logging {
SignalUtils.registerLogger(log)
val amArgs = new ApplicationMasterArguments(args)
master = new ApplicationMaster(amArgs)
- System.exit(master.run())
+
+ val ugi = master.sparkConf.get(PRINCIPAL) match {
+ case Some(principal) =>
+ val originalCreds =
UserGroupInformation.getCurrentUser().getCredentials()
+ SparkHadoopUtil.get.loginUserFromKeytab(principal,
master.sparkConf.get(KEYTAB).orNull)
+ val newUGI = UserGroupInformation.getCurrentUser()
+ // Transfer the original user's tokens to the new user, since it may
contain needed tokens
+ // (such as those user to connect to YARN).
+ newUGI.addCredentials(originalCreds)
+ newUGI
+
+ case _ =>
+ SparkHadoopUtil.get.createSparkUser()
+ }
+
+ ugi.doAs(new PrivilegedExceptionAction[Unit]() {
+ override def run(): Unit = System.exit(master.run())
+ })
}
private[spark] def sparkContextInitialized(sc: SparkContext): Unit = {
@@ -801,7 +796,7 @@ object ApplicationMaster extends Logging {
}
private[spark] def getAttemptId(): ApplicationAttemptId = {
- master.getAttemptId
+ master.appAttemptId
}
private[spark] def getHistoryServerAddress(
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index cc76a7c8f13f5..c10206c847271 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -26,6 +26,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
var primaryRFile: String = null
var userArgs: Seq[String] = Nil
var propertiesFile: String = null
+ var distCacheConf: String = null
parseArgs(args.toList)
@@ -62,6 +63,10 @@ class ApplicationMasterArguments(val args: Array[String]) {
propertiesFile = value
args = tail
+ case ("--dist-cache-conf") :: value :: tail =>
+ distCacheConf = value
+ args = tail
+
case _ =>
printUsageAndExit(1, args)
}
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 184fb6a8ad13e..4f5cf7760dbf1 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -99,21 +99,19 @@ private[spark] class Client(
}
private val distCacheMgr = new ClientDistributedCacheManager()
+ private val cachedResourcesConf = new SparkConf(false)
- private val principal = sparkConf.get(PRINCIPAL).orNull
private val keytab = sparkConf.get(KEYTAB).orNull
- private val loginFromKeytab = principal != null
- private val amKeytabFileName: String = {
+ private val amKeytabFileName: Option[String] = if (keytab != null &&
isClusterMode) {
+ val principal = sparkConf.get(PRINCIPAL).orNull
require((principal == null) == (keytab == null),
"Both principal and keytab must be defined, or neither.")
- if (loginFromKeytab) {
- logInfo(s"Kerberos credentials: principal = $principal, keytab =
$keytab")
- // Generate a file name that can be used for the keytab file, that does
not conflict
- // with any user file.
- new File(keytab).getName() + "-" + UUID.randomUUID().toString
- } else {
- null
- }
+ logInfo(s"Kerberos credentials: principal = $principal, keytab = $keytab")
+ // Generate a file name that can be used for the keytab file, that does
not conflict
+ // with any user file.
+ Some(new File(keytab).getName() + "-" + UUID.randomUUID().toString)
+ } else {
+ None
}
require(keytab == null || !Utils.isLocalUri(keytab), "Keytab should
reference a local file.")
@@ -219,16 +217,7 @@ private[spark] class Client(
}
}
- if (isClusterMode && principal != null && keytab != null) {
- val newUgi =
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
- newUgi.doAs(new PrivilegedExceptionAction[Unit] {
- override def run(): Unit = {
- cleanupStagingDirInternal()
- }
- })
- } else {
- cleanupStagingDirInternal()
- }
+ cleanupStagingDirInternal()
}
/**
@@ -311,7 +300,7 @@ private[spark] class Client(
*/
private def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = {
val credentials = UserGroupInformation.getCurrentUser().getCredentials()
- val credentialManager = new YARNHadoopDelegationTokenManager(sparkConf,
hadoopConf)
+ val credentialManager = new YARNHadoopDelegationTokenManager(sparkConf,
hadoopConf, null)
credentialManager.obtainDelegationTokens(credentials)
// When using a proxy user, copy the delegation tokens to the user's
credentials. Avoid
@@ -495,11 +484,11 @@ private[spark] class Client(
// If we passed in a keytab, make sure we copy the keytab to the staging
directory on
// HDFS, and setup the relevant environment vars, so the AM can login
again.
- if (loginFromKeytab) {
+ amKeytabFileName.foreach { kt =>
logInfo("To enable the AM to login from keytab, credentials are being
copied over to the AM" +
" via the YARN Secure Distributed Cache.")
val (_, localizedPath) = distribute(keytab,
- destName = Some(amKeytabFileName),
+ destName = Some(kt),
appMasterOnly = true)
require(localizedPath != null, "Keytab file already distributed.")
}
@@ -635,7 +624,7 @@ private[spark] class Client(
// Update the configuration with all the distributed files, minus the conf
archive. The
// conf archive will be handled by the AM differently so that we avoid
having to send
// this configuration by other means. See SPARK-14602 for one reason of
why this is needed.
- distCacheMgr.updateConfiguration(sparkConf)
+ distCacheMgr.updateConfiguration(cachedResourcesConf)
// Upload the conf archive to HDFS manually, and record its location in
the configuration.
// This will allow the AM to know where the conf archive is in HDFS, so
that it can be
@@ -647,7 +636,7 @@ private[spark] class Client(
// system.
val remoteConfArchivePath = new Path(destDir, LOCALIZED_CONF_ARCHIVE)
val remoteFs = FileSystem.get(remoteConfArchivePath.toUri(), hadoopConf)
- sparkConf.set(CACHED_CONF_ARCHIVE, remoteConfArchivePath.toString())
+ cachedResourcesConf.set(CACHED_CONF_ARCHIVE,
remoteConfArchivePath.toString())
val localConfArchive = new Path(createConfArchive().toURI())
copyFileToRemote(destDir, localConfArchive, replication, symlinkCache,
force = true,
@@ -659,11 +648,6 @@ private[spark] class Client(
remoteFs, hadoopConf, remoteConfArchivePath, localResources,
LocalResourceType.ARCHIVE,
LOCALIZED_CONF_DIR, statCache, appMasterOnly = false)
- // Clear the cache-related entries from the configuration to avoid them
polluting the
- // UI's environment page. This works for client mode; for cluster mode,
this is handled
- // by the AM.
- CACHE_CONFIGS.foreach(sparkConf.remove)
-
localResources
}
@@ -767,19 +751,25 @@ private[spark] class Client(
hadoopConf.writeXml(confStream)
confStream.closeEntry()
- // Save Spark configuration to a file in the archive, but filter out the
app's secret.
- val props = new Properties()
- sparkConf.getAll.foreach { case (k, v) =>
- props.setProperty(k, v)
+ // Save Spark configuration to a file in the archive.
+ val props = confToProperties(sparkConf)
+
+ // If propagating the keytab to the AM, override the keytab name with
the name of the
+ // distributed file. Otherwise remove princpal/keytab from the conf, so
they're not seen
+ // by the AM at all.
+ amKeytabFileName match {
+ case Some(kt) =>
+ props.setProperty(KEYTAB.key, kt)
+ case None =>
+ props.remove(PRINCIPAL.key)
+ props.remove(KEYTAB.key)
}
- // Override spark.yarn.key to point to the location in distributed cache
which will be used
- // by AM.
- Option(amKeytabFileName).foreach { k => props.setProperty(KEYTAB.key, k)
}
- confStream.putNextEntry(new ZipEntry(SPARK_CONF_FILE))
- val writer = new OutputStreamWriter(confStream, StandardCharsets.UTF_8)
- props.store(writer, "Spark configuration.")
- writer.flush()
- confStream.closeEntry()
+
+ writePropertiesToArchive(props, SPARK_CONF_FILE, confStream)
+
+ // Write the distributed cache config to the archive.
+ writePropertiesToArchive(confToProperties(cachedResourcesConf),
DIST_CACHE_CONF_FILE,
+ confStream)
} finally {
confStream.close()
}
@@ -983,7 +973,10 @@ private[spark] class Client(
}
val amArgs =
Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++
userArgs ++
- Seq("--properties-file", buildPath(Environment.PWD.$$(),
LOCALIZED_CONF_DIR, SPARK_CONF_FILE))
+ Seq("--properties-file",
+ buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, SPARK_CONF_FILE))
++
+ Seq("--dist-cache-conf",
+ buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR,
DIST_CACHE_CONF_FILE))
// Command for the ApplicationMaster
val commands = prefixEnv ++
@@ -1212,6 +1205,9 @@ private object Client extends Logging {
// Name of the file in the conf archive containing Spark configuration.
val SPARK_CONF_FILE = "__spark_conf__.properties"
+ // Name of the file in the conf archive containing the distributed cache
info.
+ val DIST_CACHE_CONF_FILE = "__spark_dist_cache__.properties"
+
// Subdirectory where the user's python files (not archives) will be placed.
val LOCALIZED_PYTHON_DIR = "__pyfiles__"
@@ -1511,6 +1507,22 @@ private object Client extends Logging {
}
getClusterPath(conf, cmdPrefix)
}
+
+ def confToProperties(conf: SparkConf): Properties = {
+ val props = new Properties()
+ conf.getAll.foreach { case (k, v) =>
+ props.setProperty(k, v)
+ }
+ props
+ }
+
+ def writePropertiesToArchive(props: Properties, name: String, out:
ZipOutputStream): Unit = {
+ out.putNextEntry(new ZipEntry(name))
+ val writer = new OutputStreamWriter(out, StandardCharsets.UTF_8)
+ props.store(writer, "Spark configuration.")
+ writer.flush()
+ out.closeEntry()
+ }
}
private[spark] class YarnClusterApplication extends SparkApplication {
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index 05a7b1e1310c4..cf16edf16c034 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -76,12 +76,13 @@ private[spark] class YarnRMClient extends Logging {
def createAllocator(
conf: YarnConfiguration,
sparkConf: SparkConf,
+ appAttemptId: ApplicationAttemptId,
driverUrl: String,
driverRef: RpcEndpointRef,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource]): YarnAllocator = {
require(registered, "Must register AM before creating allocator.")
- new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient,
getAttemptId(), securityMgr,
+ new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient,
appAttemptId, securityMgr,
localResources, new SparkRackResolver())
}
@@ -100,11 +101,6 @@ private[spark] class YarnRMClient extends Logging {
}
}
- /** Returns the attempt ID. */
- def getAttemptId(): ApplicationAttemptId = {
- YarnSparkHadoopUtil.getContainerId.getApplicationAttemptId()
- }
-
/** Returns the configuration for the AmIpFilter to add to the Spark UI. */
def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String):
Map[String, String] = {
// Figure out which scheme Yarn is using. Note the method seems to have
been added after 2.2,
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index b257d8fdd3b1a..a9e5bfab4e6df 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -325,16 +325,6 @@ package object config {
.stringConf
.createOptional
- // The list of cache-related config entries. This is used by Client and the
AM to clean
- // up the environment so that these settings do not appear on the web UI.
- private[yarn] val CACHE_CONFIGS = Seq(
- CACHED_FILES,
- CACHED_FILES_SIZES,
- CACHED_FILES_TIMESTAMPS,
- CACHED_FILES_VISIBILITIES,
- CACHED_FILES_TYPES,
- CACHED_CONF_ARCHIVE)
-
/* YARN allocator-level blacklisting related config entries. */
private[spark] val YARN_EXECUTOR_LAUNCH_BLACKLIST_ENABLED =
ConfigBuilder("spark.yarn.blacklist.executor.launch.blacklisting.enabled")
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala
index 2d9a3f0c83fd2..bb40ea8015198 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala
@@ -36,10 +36,11 @@ import org.apache.spark.util.Utils
* [[ServiceCredentialProvider]] interface, as well as the builtin providers
defined
* in [[HadoopDelegationTokenManager]].
*/
-private[yarn] class YARNHadoopDelegationTokenManager(
+private[spark] class YARNHadoopDelegationTokenManager(
_sparkConf: SparkConf,
- _hadoopConf: Configuration)
- extends HadoopDelegationTokenManager(_sparkConf, _hadoopConf) {
+ _hadoopConf: Configuration,
+ _schedulerRef: RpcEndpointRef)
+ extends HadoopDelegationTokenManager(_sparkConf, _hadoopConf, _schedulerRef)
{
private val credentialProviders = {
ServiceLoader.load(classOf[ServiceCredentialProvider],
Utils.getContextOrSparkClassLoader)
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 9397a1e3de9ac..935cda3c4be4b 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -27,6 +27,7 @@ import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.launcher.SparkAppHandle
import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
private[spark] class YarnClientSchedulerBackend(
scheduler: TaskSchedulerImpl,
@@ -166,4 +167,9 @@ private[spark] class YarnClientSchedulerBackend(
logInfo("Stopped")
}
+ override protected def updateDelegationTokens(tokens: Array[Byte]): Unit = {
+ super.updateDelegationTokens(tokens)
+ amEndpoint.foreach(_.send(UpdateDelegationTokens(tokens)))
+ }
+
}
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 1289d4be79ea4..6357d4adbcd99 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -27,6 +27,8 @@ import scala.util.control.NonFatal
import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
import org.apache.spark.SparkContext
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.deploy.yarn.security.YARNHadoopDelegationTokenManager
import org.apache.spark.internal.Logging
import org.apache.spark.rpc._
import org.apache.spark.scheduler._
@@ -55,6 +57,7 @@ private[spark] abstract class YarnSchedulerBackend(
protected var totalExpectedExecutors = 0
private val yarnSchedulerEndpoint = new YarnSchedulerEndpoint(rpcEnv)
+ protected var amEndpoint: Option[RpcEndpointRef] = None
private val yarnSchedulerEndpointRef = rpcEnv.setupEndpoint(
YarnSchedulerBackend.ENDPOINT_NAME, yarnSchedulerEndpoint)
@@ -191,6 +194,11 @@ private[spark] abstract class YarnSchedulerBackend(
sc.executorAllocationManager.foreach(_.reset())
}
+ override protected def createTokenManager(
+ schedulerRef: RpcEndpointRef): Option[HadoopDelegationTokenManager] = {
+ Some(new YARNHadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration,
schedulerRef))
+ }
+
/**
* Override the DriverEndpoint to add extra logic for the case when an
executor is disconnected.
* This endpoint communicates with the executors and queries the AM for an
executor's exit
@@ -226,7 +234,6 @@ private[spark] abstract class YarnSchedulerBackend(
*/
private class YarnSchedulerEndpoint(override val rpcEnv: RpcEnv)
extends ThreadSafeRpcEndpoint with Logging {
- private var amEndpoint: Option[RpcEndpointRef] = None
private[YarnSchedulerBackend] def handleExecutorDisconnectedFromDriver(
executorId: String,
@@ -266,11 +273,6 @@ private[spark] abstract class YarnSchedulerBackend(
logWarning(s"Requesting driver to remove executor $executorId for
reason $reason")
driverEndpoint.send(r)
}
-
- case u @ UpdateDelegationTokens(tokens) =>
- // Add the tokens to the current user and send a message to the
scheduler so that it
- // notifies all registered executors of the new tokens.
- driverEndpoint.send(u)
}
override def receiveAndReply(context: RpcCallContext):
PartialFunction[Any, Unit] = {
@@ -304,6 +306,9 @@ private[spark] abstract class YarnSchedulerBackend(
case RetrieveLastAllocatedExecutorId =>
context.reply(currentExecutorIdCounter)
+
+ case RetrieveDelegationTokens =>
+ context.reply(currentDelegationTokens)
}
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
diff --git
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
index 98315e4235741..f00453cb9c597 100644
---
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
+++
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
@@ -34,7 +34,7 @@ class YARNHadoopDelegationTokenManagerSuite extends
SparkFunSuite {
}
test("Correctly loads credential providers") {
- credentialManager = new YARNHadoopDelegationTokenManager(sparkConf,
hadoopConf)
+ credentialManager = new YARNHadoopDelegationTokenManager(sparkConf,
hadoopConf, null)
assert(credentialManager.isProviderLoaded("yarn-test"))
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]