Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22911#discussion_r233650398
  
    --- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
 ---
    @@ -69,97 +72,191 @@ private[spark] class KerberosConfDriverFeatureStep(
         "If a Kerberos keytab is specified you must also specify a Kerberos 
principal")
     
       KubernetesUtils.requireBothOrNeitherDefined(
    -    existingSecretName,
    -    existingSecretItemKey,
    +    existingDtSecret,
    +    existingDtItemKey,
         "If a secret data item-key where the data of the Kerberos Delegation 
Token is specified" +
           " you must also specify the name of the secret",
         "If a secret storing a Kerberos Delegation Token is specified you must 
also" +
           " specify the item-key where the data is stored")
     
    -  private val hadoopConfigurationFiles = 
hadoopConfDirSpec.hadoopConfDir.map { hConfDir =>
    -    HadoopBootstrapUtil.getHadoopConfFiles(hConfDir)
    +  if (!hasKerberosConf) {
    +    logInfo("You have not specified a krb5.conf file locally or via a 
ConfigMap. " +
    +      "Make sure that you have the krb5.conf locally on the driver image.")
       }
    -  private val newHadoopConfigMapName =
    -    if (hadoopConfDirSpec.hadoopConfigMapName.isEmpty) {
    -      Some(kubernetesConf.hadoopConfigMapName)
    -    } else {
    -      None
    -    }
     
    -  // Either use pre-existing secret or login to create new Secret with DT 
stored within
    -  private val kerberosConfSpec: Option[KerberosConfigSpec] = (for {
    -    secretName <- existingSecretName
    -    secretItemKey <- existingSecretItemKey
    -  } yield {
    -    KerberosConfigSpec(
    -      dtSecret = None,
    -      dtSecretName = secretName,
    -      dtSecretItemKey = secretItemKey,
    -      jobUserName = kubeTokenManager.getCurrentUser.getShortUserName)
    -  }).orElse(
    -    if (isKerberosEnabled) {
    -      Some(HadoopKerberosLogin.buildSpec(
    -        conf,
    -        kubernetesConf.appResourceNamePrefix,
    -        kubeTokenManager))
    -    } else {
    -      None
    +  // Create delegation tokens if needed. This is a lazy val so that it's 
not populated
    +  // unnecessarily. But it needs to be accessible to different methods in 
this class,
    +  // since it's not clear based solely on available configuration options 
that delegation
    +  // tokens are needed when other credentials are not available.
    +  private lazy val delegationTokens: Array[Byte] = if (keytab.isEmpty && 
existingDtSecret.isEmpty) {
    +    createDelegationTokens()
    +  } else {
    +    null
    +  }
    +
    +  private def needKeytabUpload: Boolean = 
keytab.exists(!Utils.isLocalUri(_))
    +
    +  private def dtSecretName: String = 
s"${kubernetesConf.appResourceNamePrefix}-delegation-tokens"
    +
    +  private def ktSecretName: String = 
s"${kubernetesConf.appResourceNamePrefix}-kerberos-keytab"
    +
    +  private def hasKerberosConf: Boolean = krb5CMap.isDefined | 
krb5File.isDefined
    +
    +  override def configurePod(original: SparkPod): SparkPod = {
    +    original.transform { case pod if hasKerberosConf =>
    +      val configMapVolume = if (krb5CMap.isDefined) {
    +        new VolumeBuilder()
    +          .withName(KRB_FILE_VOLUME)
    +          .withNewConfigMap()
    +            .withName(krb5CMap.get)
    +            .endConfigMap()
    +          .build()
    +      } else {
    +        val krb5Conf = new File(krb5File.get)
    +        new VolumeBuilder()
    +          .withName(KRB_FILE_VOLUME)
    +          .withNewConfigMap()
    +          .withName(kubernetesConf.krbConfigMapName)
    +          .withItems(new KeyToPathBuilder()
    +            .withKey(krb5Conf.getName())
    +            .withPath(krb5Conf.getName())
    +            .build())
    +          .endConfigMap()
    +          .build()
    +      }
    +
    +      val podWithVolume = new PodBuilder(pod.pod)
    +        .editSpec()
    +          .addNewVolumeLike(configMapVolume)
    +            .endVolume()
    +          .endSpec()
    +        .build()
    +
    +      val containerWithMount = new ContainerBuilder(pod.container)
    +        .addNewVolumeMount()
    +          .withName(KRB_FILE_VOLUME)
    +          .withMountPath(KRB_FILE_DIR_PATH + "/krb5.conf")
    +          .withSubPath("krb5.conf")
    +          .endVolumeMount()
    +        .build()
    +
    +      SparkPod(podWithVolume, containerWithMount)
    +    }.transform {
    +      case pod if needKeytabUpload =>
    +        // If keytab is defined and is a submission-local file (not local: 
URI), then create a
    +        // secret for it. The keytab data will be stored in this secret 
below.
    +        val podWitKeytab = new PodBuilder(pod.pod)
    +          .editOrNewSpec()
    +            .addNewVolume()
    +              .withName(KERBEROS_KEYTAB_VOLUME)
    +              .withNewSecret()
    +                .withSecretName(ktSecretName)
    +                .endSecret()
    +              .endVolume()
    +            .endSpec()
    +          .build()
    +
    +        val containerWithKeytab = new ContainerBuilder(pod.container)
    +          .addNewVolumeMount()
    +            .withName(KERBEROS_KEYTAB_VOLUME)
    +            .withMountPath(KERBEROS_KEYTAB_MOUNT_POINT)
    +            .endVolumeMount()
    +          .build()
    +
    +        SparkPod(podWitKeytab, containerWithKeytab)
    +
    +      case pod if existingDtSecret.isDefined | delegationTokens != null =>
    +        val secretName = existingDtSecret.getOrElse(dtSecretName)
    +        val itemKey = existingDtItemKey.getOrElse(KERBEROS_SECRET_KEY)
    +
    +        val podWithTokens = new PodBuilder(pod.pod)
    +          .editOrNewSpec()
    +            .addNewVolume()
    +              .withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
    +              .withNewSecret()
    +                .withSecretName(secretName)
    +                .endSecret()
    +              .endVolume()
    +            .endSpec()
    +          .build()
    +
    +        val containerWithTokens = new ContainerBuilder(pod.container)
    +          .addNewVolumeMount()
    +            .withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
    +            .withMountPath(SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR)
    +            .endVolumeMount()
    +          .addNewEnv()
    +            .withName(ENV_HADOOP_TOKEN_FILE_LOCATION)
    +            .withValue(s"$SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR/$itemKey")
    +            .endEnv()
    +          .build()
    +
    +        SparkPod(podWithTokens, containerWithTokens)
         }
    -  )
    -
    -  override def configurePod(pod: SparkPod): SparkPod = {
    -    val hadoopBasedSparkPod = HadoopBootstrapUtil.bootstrapHadoopConfDir(
    -      hadoopConfDirSpec.hadoopConfDir,
    -      newHadoopConfigMapName,
    -      hadoopConfDirSpec.hadoopConfigMapName,
    -      pod)
    -    kerberosConfSpec.map { hSpec =>
    -      HadoopBootstrapUtil.bootstrapKerberosPod(
    -        hSpec.dtSecretName,
    -        hSpec.dtSecretItemKey,
    -        hSpec.jobUserName,
    -        krb5File,
    -        Some(kubernetesConf.krbConfigMapName),
    -        krb5CMap,
    -        hadoopBasedSparkPod)
    -    }.getOrElse(
    -      HadoopBootstrapUtil.bootstrapSparkUserPod(
    -        kubeTokenManager.getCurrentUser.getShortUserName,
    -        hadoopBasedSparkPod))
       }
     
       override def getAdditionalPodSystemProperties(): Map[String, String] = {
    -    val resolvedConfValues = kerberosConfSpec.map { hSpec =>
    -      Map(KERBEROS_DT_SECRET_NAME -> hSpec.dtSecretName,
    -        KERBEROS_DT_SECRET_KEY -> hSpec.dtSecretItemKey,
    -        KERBEROS_SPARK_USER_NAME -> hSpec.jobUserName,
    -        KRB5_CONFIG_MAP_NAME -> 
krb5CMap.getOrElse(kubernetesConf.krbConfigMapName))
    -      }.getOrElse(
    -        Map(KERBEROS_SPARK_USER_NAME ->
    -          kubeTokenManager.getCurrentUser.getShortUserName))
    -    Map(HADOOP_CONFIG_MAP_NAME ->
    -      hadoopConfDirSpec.hadoopConfigMapName.getOrElse(
    -      kubernetesConf.hadoopConfigMapName)) ++ resolvedConfValues
    +    // If a submission-local keytab is provided, update the Spark config 
so that it knows the
    +    // path of the keytab in the driver container.
    +    if (needKeytabUpload) {
    +      val ktName = new File(keytab.get).getName()
    +      Map(KEYTAB.key -> s"$KERBEROS_KEYTAB_MOUNT_POINT/$ktName")
    +    } else {
    +      Map.empty
    +    }
       }
     
       override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
    -    val hadoopConfConfigMap = for {
    -      hName <- newHadoopConfigMapName
    -      hFiles <- hadoopConfigurationFiles
    -    } yield {
    -      HadoopBootstrapUtil.buildHadoopConfigMap(hName, hFiles)
    +    Seq[HasMetadata]() ++ {
    --- End diff --
    
    re: the style, same reasoning as the `transform` method I added. It avoids 
having to create a bunch of local variables and then concatenate them at the 
end.
    
    re: the Util class, it existed because the methods were being called from 
multiple feature steps. Now there's only one step that does that stuff, so the 
util class became unnecessary.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to