Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22911#discussion_r233667509 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala --- @@ -69,97 +72,191 @@ private[spark] class KerberosConfDriverFeatureStep( "If a Kerberos keytab is specified you must also specify a Kerberos principal") KubernetesUtils.requireBothOrNeitherDefined( - existingSecretName, - existingSecretItemKey, + existingDtSecret, + existingDtItemKey, "If a secret data item-key where the data of the Kerberos Delegation Token is specified" + " you must also specify the name of the secret", "If a secret storing a Kerberos Delegation Token is specified you must also" + " specify the item-key where the data is stored") - private val hadoopConfigurationFiles = hadoopConfDirSpec.hadoopConfDir.map { hConfDir => - HadoopBootstrapUtil.getHadoopConfFiles(hConfDir) + if (!hasKerberosConf) { + logInfo("You have not specified a krb5.conf file locally or via a ConfigMap. " + + "Make sure that you have the krb5.conf locally on the driver image.") } - private val newHadoopConfigMapName = - if (hadoopConfDirSpec.hadoopConfigMapName.isEmpty) { - Some(kubernetesConf.hadoopConfigMapName) - } else { - None - } - // Either use pre-existing secret or login to create new Secret with DT stored within - private val kerberosConfSpec: Option[KerberosConfigSpec] = (for { - secretName <- existingSecretName - secretItemKey <- existingSecretItemKey - } yield { - KerberosConfigSpec( - dtSecret = None, - dtSecretName = secretName, - dtSecretItemKey = secretItemKey, - jobUserName = kubeTokenManager.getCurrentUser.getShortUserName) - }).orElse( - if (isKerberosEnabled) { - Some(HadoopKerberosLogin.buildSpec( - conf, - kubernetesConf.appResourceNamePrefix, - kubeTokenManager)) - } else { - None + // Create delegation tokens if needed. This is a lazy val so that it's not populated + // unnecessarily. But it needs to be accessible to different methods in this class, + // since it's not clear based solely on available configuration options that delegation + // tokens are needed when other credentials are not available. + private lazy val delegationTokens: Array[Byte] = if (keytab.isEmpty && existingDtSecret.isEmpty) { + createDelegationTokens() + } else { + null + } + + private def needKeytabUpload: Boolean = keytab.exists(!Utils.isLocalUri(_)) + + private def dtSecretName: String = s"${kubernetesConf.appResourceNamePrefix}-delegation-tokens" + + private def ktSecretName: String = s"${kubernetesConf.appResourceNamePrefix}-kerberos-keytab" + + private def hasKerberosConf: Boolean = krb5CMap.isDefined | krb5File.isDefined + + override def configurePod(original: SparkPod): SparkPod = { + original.transform { case pod if hasKerberosConf => + val configMapVolume = if (krb5CMap.isDefined) { + new VolumeBuilder() + .withName(KRB_FILE_VOLUME) + .withNewConfigMap() + .withName(krb5CMap.get) + .endConfigMap() + .build() + } else { + val krb5Conf = new File(krb5File.get) + new VolumeBuilder() + .withName(KRB_FILE_VOLUME) + .withNewConfigMap() + .withName(kubernetesConf.krbConfigMapName) + .withItems(new KeyToPathBuilder() + .withKey(krb5Conf.getName()) + .withPath(krb5Conf.getName()) + .build()) + .endConfigMap() + .build() + } + + val podWithVolume = new PodBuilder(pod.pod) + .editSpec() + .addNewVolumeLike(configMapVolume) + .endVolume() + .endSpec() + .build() + + val containerWithMount = new ContainerBuilder(pod.container) + .addNewVolumeMount() + .withName(KRB_FILE_VOLUME) + .withMountPath(KRB_FILE_DIR_PATH + "/krb5.conf") + .withSubPath("krb5.conf") + .endVolumeMount() + .build() + + SparkPod(podWithVolume, containerWithMount) + }.transform { + case pod if needKeytabUpload => + // If keytab is defined and is a submission-local file (not local: URI), then create a + // secret for it. The keytab data will be stored in this secret below. + val podWitKeytab = new PodBuilder(pod.pod) + .editOrNewSpec() + .addNewVolume() + .withName(KERBEROS_KEYTAB_VOLUME) + .withNewSecret() + .withSecretName(ktSecretName) + .endSecret() + .endVolume() + .endSpec() + .build() + + val containerWithKeytab = new ContainerBuilder(pod.container) + .addNewVolumeMount() + .withName(KERBEROS_KEYTAB_VOLUME) + .withMountPath(KERBEROS_KEYTAB_MOUNT_POINT) + .endVolumeMount() + .build() + + SparkPod(podWitKeytab, containerWithKeytab) + + case pod if existingDtSecret.isDefined | delegationTokens != null => + val secretName = existingDtSecret.getOrElse(dtSecretName) + val itemKey = existingDtItemKey.getOrElse(KERBEROS_SECRET_KEY) + + val podWithTokens = new PodBuilder(pod.pod) + .editOrNewSpec() + .addNewVolume() + .withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME) + .withNewSecret() + .withSecretName(secretName) + .endSecret() + .endVolume() + .endSpec() + .build() + + val containerWithTokens = new ContainerBuilder(pod.container) + .addNewVolumeMount() + .withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME) + .withMountPath(SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR) + .endVolumeMount() + .addNewEnv() + .withName(ENV_HADOOP_TOKEN_FILE_LOCATION) + .withValue(s"$SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR/$itemKey") + .endEnv() + .build() + + SparkPod(podWithTokens, containerWithTokens) } - ) - - override def configurePod(pod: SparkPod): SparkPod = { - val hadoopBasedSparkPod = HadoopBootstrapUtil.bootstrapHadoopConfDir( - hadoopConfDirSpec.hadoopConfDir, - newHadoopConfigMapName, - hadoopConfDirSpec.hadoopConfigMapName, - pod) - kerberosConfSpec.map { hSpec => - HadoopBootstrapUtil.bootstrapKerberosPod( - hSpec.dtSecretName, - hSpec.dtSecretItemKey, - hSpec.jobUserName, - krb5File, - Some(kubernetesConf.krbConfigMapName), - krb5CMap, - hadoopBasedSparkPod) - }.getOrElse( - HadoopBootstrapUtil.bootstrapSparkUserPod( - kubeTokenManager.getCurrentUser.getShortUserName, - hadoopBasedSparkPod)) } override def getAdditionalPodSystemProperties(): Map[String, String] = { - val resolvedConfValues = kerberosConfSpec.map { hSpec => - Map(KERBEROS_DT_SECRET_NAME -> hSpec.dtSecretName, - KERBEROS_DT_SECRET_KEY -> hSpec.dtSecretItemKey, - KERBEROS_SPARK_USER_NAME -> hSpec.jobUserName, - KRB5_CONFIG_MAP_NAME -> krb5CMap.getOrElse(kubernetesConf.krbConfigMapName)) - }.getOrElse( - Map(KERBEROS_SPARK_USER_NAME -> - kubeTokenManager.getCurrentUser.getShortUserName)) - Map(HADOOP_CONFIG_MAP_NAME -> - hadoopConfDirSpec.hadoopConfigMapName.getOrElse( - kubernetesConf.hadoopConfigMapName)) ++ resolvedConfValues + // If a submission-local keytab is provided, update the Spark config so that it knows the + // path of the keytab in the driver container. + if (needKeytabUpload) { + val ktName = new File(keytab.get).getName() + Map(KEYTAB.key -> s"$KERBEROS_KEYTAB_MOUNT_POINT/$ktName") + } else { + Map.empty + } } override def getAdditionalKubernetesResources(): Seq[HasMetadata] = { - val hadoopConfConfigMap = for { - hName <- newHadoopConfigMapName - hFiles <- hadoopConfigurationFiles - } yield { - HadoopBootstrapUtil.buildHadoopConfigMap(hName, hFiles) + Seq[HasMetadata]() ++ { + krb5File.map { path => + val file = new File(path) + new ConfigMapBuilder() + .withNewMetadata() + .withName(kubernetesConf.krbConfigMapName) + .endMetadata() + .addToData( + Map(file.getName() -> Files.toString(file, StandardCharsets.UTF_8)).asJava) + .build() + } + } ++ { + // If a submission-local keytab is provided, stash it in a secret. + if (needKeytabUpload) { + val kt = new File(keytab.get) + Seq(new SecretBuilder() + .withNewMetadata() + .withName(ktSecretName) + .endMetadata() + .addToData(kt.getName(), Base64.encodeBase64String(Files.toByteArray(kt))) + .build()) + } else { + Nil + } + } ++ { + if (delegationTokens != null) { + Seq(new SecretBuilder() + .withNewMetadata() + .withName(dtSecretName) + .endMetadata() + .addToData(KERBEROS_SECRET_KEY, Base64.encodeBase64String(delegationTokens)) + .build()) + } else { + Nil + } } + } - val krb5ConfigMap = krb5File.map { fileLocation => - HadoopBootstrapUtil.buildkrb5ConfigMap( - kubernetesConf.krbConfigMapName, - fileLocation) + // Visible for testing. + def createDelegationTokens(): Array[Byte] = { --- End diff -- > Hmm, I somewhat disagree, but maybe I am just overly opinionated. The code that creates delegation tokens is not in this class. If delegation token creation is to be tested, it needs to be tested in the test code for `HadoopDelegationTokenManager`. All this class does is take the tokens that other class creates and stash them in a secret. I'll change this to not need a mock at all, mostly to make a point.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org