Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r227072541
  
    --- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
 ---
    @@ -17,76 +17,175 @@
     
     package org.apache.spark.deploy.security
     
    +import java.io.File
    +import java.security.PrivilegedExceptionAction
    +import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
    +
     import org.apache.hadoop.conf.Configuration
     import org.apache.hadoop.fs.FileSystem
    -import org.apache.hadoop.security.Credentials
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
     
     import org.apache.spark.SparkConf
    +import org.apache.spark.deploy.SparkHadoopUtil
     import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
    +import org.apache.spark.util.ThreadUtils
     
     /**
    - * Manages all the registered HadoopDelegationTokenProviders and offer 
APIs for other modules to
    - * obtain delegation tokens and their renewal time. By default 
[[HadoopFSDelegationTokenProvider]],
    - * [[HiveDelegationTokenProvider]] and [[HBaseDelegationTokenProvider]] 
will be loaded in if not
    - * explicitly disabled.
    + * Manager for delegation tokens in a Spark application.
    + *
    + * This manager has two modes of operation:
    + *
    + * 1.  When configured with a principal and a keytab, it will make sure 
long-running apps can run
    + * without interruption while accessing secured services. It periodically 
logs in to the KDC with
    + * user-provided credentials, and contacts all the configured secure 
services to obtain delegation
    + * tokens to be distributed to the rest of the application.
    + *
    + * Because the Hadoop UGI API does not expose the TTL of the TGT, a 
configuration controls how often
    + * to check that a relogin is necessary. This is done reasonably often 
since the check is a no-op
    + * when the relogin is not yet needed. The check period can be overridden 
in the configuration.
      *
    - * Also, each HadoopDelegationTokenProvider is controlled by
    - * spark.security.credentials.{service}.enabled, and will not be loaded if 
this config is set to
    - * false. For example, Hive's delegation token provider 
[[HiveDelegationTokenProvider]] can be
    - * enabled/disabled by the configuration 
spark.security.credentials.hive.enabled.
    + * New delegation tokens are created once 75% of the renewal interval of 
the original tokens has
    + * elapsed. The new tokens are sent to the Spark driver endpoint once it's 
registered with the AM.
    + * The driver is tasked with distributing the tokens to other processes 
that might need them.
      *
    - * @param sparkConf Spark configuration
    - * @param hadoopConf Hadoop configuration
    - * @param fileSystems Delegation tokens will be fetched for these Hadoop 
filesystems.
    + * 2. When operating without an explicit principal and keytab, token 
renewal will not be available.
    + * Starting the manager will distribute an initial set of delegation 
tokens to the provided Spark
    + * driver, but the app will not get new tokens when those expire.
    + *
    + * It can also be used just to create delegation tokens, by calling the 
`obtainDelegationTokens`
    + * method. This option does not require calling the `start` method, but 
leaves it up to the
    + * caller to distribute the tokens that were generated.
      */
     private[spark] class HadoopDelegationTokenManager(
    -    sparkConf: SparkConf,
    -    hadoopConf: Configuration,
    -    fileSystems: Configuration => Set[FileSystem])
    -  extends Logging {
    +    protected val sparkConf: SparkConf,
    +    protected val hadoopConf: Configuration) extends Logging {
     
       private val deprecatedProviderEnabledConfigs = List(
         "spark.yarn.security.tokens.%s.enabled",
         "spark.yarn.security.credentials.%s.enabled")
       private val providerEnabledConfig = 
"spark.security.credentials.%s.enabled"
     
    -  // Maintain all the registered delegation token providers
    -  private val delegationTokenProviders = getDelegationTokenProviders
    +  private val principal = sparkConf.get(PRINCIPAL).orNull
    +  private val keytab = sparkConf.get(KEYTAB).orNull
    +
    +  if (principal != null) {
    +    require(keytab != null, "Kerberos principal specified without a 
keytab.")
    +    require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
    +  }
    +
    +  private val delegationTokenProviders = loadProviders()
       logDebug("Using the following builtin delegation token providers: " +
         s"${delegationTokenProviders.keys.mkString(", ")}.")
     
    -  /** Construct a [[HadoopDelegationTokenManager]] for the default Hadoop 
filesystem */
    -  def this(sparkConf: SparkConf, hadoopConf: Configuration) = {
    -    this(
    -      sparkConf,
    -      hadoopConf,
    -      hadoopConf => 
Set(FileSystem.get(hadoopConf).getHomeDirectory.getFileSystem(hadoopConf)))
    +  private var renewalExecutor: ScheduledExecutorService = _
    +  private val driverRef = new AtomicReference[RpcEndpointRef]()
    +
    +  protected def setDriverRef(ref: RpcEndpointRef): Unit = {
    +    driverRef.set(ref)
       }
     
    -  private def getDelegationTokenProviders: Map[String, 
HadoopDelegationTokenProvider] = {
    -    val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) 
++
    -      safeCreateProvider(new HiveDelegationTokenProvider) ++
    -      safeCreateProvider(new HBaseDelegationTokenProvider)
    +  /**
    +   * Start the token renewer. Upon start, if a principal has been 
configured, the renewer will:
    +   *
    +   * - log in the configured principal, and set up a task to keep that 
user's ticket renewed
    +   * - obtain delegation tokens from all available providers
    +   * - schedule a periodic task to update the tokens when needed.
    +   *
    +   * When token renewal is not enabled, this method will not start any 
periodic tasks. Instead, it
    +   * will generate tokens if the driver ref has been provided, update the 
current user, and send
    +   * those tokens to the driver. No future tokens will be generated, so 
when that initial set
    +   * expires, the app will stop working.
    +   *
    +   * @param driver If provided, the driver where to send the newly 
generated tokens.
    +   *               The same ref will also receive future token updates 
unless overridden later.
    +   * @return The newly logged in user, or null if a principal is not 
configured.
    +   */
    +  def start(driver: Option[RpcEndpointRef] = None): UserGroupInformation = 
{
    --- End diff --
    
    This code has always lacked proper unit tests. In part it's because it's 
hard to tell what a good unit test is here, in part because no one ever 
bothered. It's also lacking integration tests, although I have a few internally 
that I generally run on these changes.
    
    That needs to be looked at separately; adding changes here without knowing 
what a good unit test for this code is would be kinda pointless.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to