[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...

2018-12-04 Thread ifilonenko
Github user ifilonenko commented on a diff in the pull request:

https://github.com/apache/spark/pull/22911#discussion_r238770191
  
--- Diff: 
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStepSuite.scala
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.security.PrivilegedExceptionAction
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+import io.fabric8.kubernetes.api.model.{ConfigMap, Secret}
+import org.apache.commons.codec.binary.Base64
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.k8s._
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
+import org.apache.spark.internal.config._
+import org.apache.spark.util.Utils
+
+class KerberosConfDriverFeatureStepSuite extends SparkFunSuite {
+
+  import KubernetesFeaturesTestUtils._
+  import SecretVolumeUtils._
+
+  private val tmpDir = Utils.createTempDir()
+
+  test("mount krb5 config map if defined") {
+val configMap = "testConfigMap"
+val step = createStep(
+  new SparkConf(false).set(KUBERNETES_KERBEROS_KRB5_CONFIG_MAP, 
configMap))
+
+checkPodForKrbConf(step.configurePod(SparkPod.initialPod()), configMap)
+assert(step.getAdditionalPodSystemProperties().isEmpty)
+
assert(filter[ConfigMap](step.getAdditionalKubernetesResources()).isEmpty)
+  }
+
+  test("create krb5.conf config map if local config provided") {
+val krbConf = File.createTempFile("krb5", ".conf", tmpDir)
+Files.write("some data", krbConf, UTF_8)
+
+val sparkConf = new SparkConf(false)
+  .set(KUBERNETES_KERBEROS_KRB5_FILE, krbConf.getAbsolutePath())
+val step = createStep(sparkConf)
+
+val confMap = 
filter[ConfigMap](step.getAdditionalKubernetesResources()).head
+assert(confMap.getData().keySet().asScala === Set(krbConf.getName()))
+
+checkPodForKrbConf(step.configurePod(SparkPod.initialPod()), 
confMap.getMetadata().getName())
+assert(step.getAdditionalPodSystemProperties().isEmpty)
+  }
+
+  test("create keytab secret if client keytab file used") {
+val keytab = File.createTempFile("keytab", ".bin", tmpDir)
+Files.write("some data", keytab, UTF_8)
+
+val sparkConf = new SparkConf(false)
+  .set(KEYTAB, keytab.getAbsolutePath())
+  .set(PRINCIPAL, "alice")
+val step = createStep(sparkConf)
+
+val pod = step.configurePod(SparkPod.initialPod())
+assert(podHasVolume(pod.pod, KERBEROS_KEYTAB_VOLUME))
+assert(containerHasVolume(pod.container, KERBEROS_KEYTAB_VOLUME, 
KERBEROS_KEYTAB_MOUNT_POINT))
+
+assert(step.getAdditionalPodSystemProperties().keys === 
Set(KEYTAB.key))
+
+val secret = 
filter[Secret](step.getAdditionalKubernetesResources()).head
+assert(secret.getData().keySet().asScala === Set(keytab.getName()))
+  }
+
+  test("do nothing if container-local keytab used") {
+val sparkConf = new SparkConf(false)
+  .set(KEYTAB, "local:/my.keytab")
+  .set(PRINCIPAL, "alice")
+val step = createStep(sparkConf)
+
+val initial = SparkPod.initialPod()
+assert(step.configurePod(initial) === initial)
+assert(step.getAdditionalPodSystemProperties().isEmpty)
+assert(step.getAdditionalKubernetesResources().isEmpty)
+  }
+
+  test("mount delegation tokens if provided") {
+val dtSecret = "tokenSecret"
+val sparkConf = new SparkConf(false)
+ 

[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...

2018-11-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22911#discussion_r233667509
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
 ---
@@ -69,97 +72,191 @@ private[spark] class KerberosConfDriverFeatureStep(
 "If a Kerberos keytab is specified you must also specify a Kerberos 
principal")
 
   KubernetesUtils.requireBothOrNeitherDefined(
-existingSecretName,
-existingSecretItemKey,
+existingDtSecret,
+existingDtItemKey,
 "If a secret data item-key where the data of the Kerberos Delegation 
Token is specified" +
   " you must also specify the name of the secret",
 "If a secret storing a Kerberos Delegation Token is specified you must 
also" +
   " specify the item-key where the data is stored")
 
-  private val hadoopConfigurationFiles = 
hadoopConfDirSpec.hadoopConfDir.map { hConfDir =>
-HadoopBootstrapUtil.getHadoopConfFiles(hConfDir)
+  if (!hasKerberosConf) {
+logInfo("You have not specified a krb5.conf file locally or via a 
ConfigMap. " +
+  "Make sure that you have the krb5.conf locally on the driver image.")
   }
-  private val newHadoopConfigMapName =
-if (hadoopConfDirSpec.hadoopConfigMapName.isEmpty) {
-  Some(kubernetesConf.hadoopConfigMapName)
-} else {
-  None
-}
 
-  // Either use pre-existing secret or login to create new Secret with DT 
stored within
-  private val kerberosConfSpec: Option[KerberosConfigSpec] = (for {
-secretName <- existingSecretName
-secretItemKey <- existingSecretItemKey
-  } yield {
-KerberosConfigSpec(
-  dtSecret = None,
-  dtSecretName = secretName,
-  dtSecretItemKey = secretItemKey,
-  jobUserName = kubeTokenManager.getCurrentUser.getShortUserName)
-  }).orElse(
-if (isKerberosEnabled) {
-  Some(HadoopKerberosLogin.buildSpec(
-conf,
-kubernetesConf.appResourceNamePrefix,
-kubeTokenManager))
-} else {
-  None
+  // Create delegation tokens if needed. This is a lazy val so that it's 
not populated
+  // unnecessarily. But it needs to be accessible to different methods in 
this class,
+  // since it's not clear based solely on available configuration options 
that delegation
+  // tokens are needed when other credentials are not available.
+  private lazy val delegationTokens: Array[Byte] = if (keytab.isEmpty && 
existingDtSecret.isEmpty) {
+createDelegationTokens()
+  } else {
+null
+  }
+
+  private def needKeytabUpload: Boolean = 
keytab.exists(!Utils.isLocalUri(_))
+
+  private def dtSecretName: String = 
s"${kubernetesConf.appResourceNamePrefix}-delegation-tokens"
+
+  private def ktSecretName: String = 
s"${kubernetesConf.appResourceNamePrefix}-kerberos-keytab"
+
+  private def hasKerberosConf: Boolean = krb5CMap.isDefined | 
krb5File.isDefined
+
+  override def configurePod(original: SparkPod): SparkPod = {
+original.transform { case pod if hasKerberosConf =>
+  val configMapVolume = if (krb5CMap.isDefined) {
+new VolumeBuilder()
+  .withName(KRB_FILE_VOLUME)
+  .withNewConfigMap()
+.withName(krb5CMap.get)
+.endConfigMap()
+  .build()
+  } else {
+val krb5Conf = new File(krb5File.get)
+new VolumeBuilder()
+  .withName(KRB_FILE_VOLUME)
+  .withNewConfigMap()
+  .withName(kubernetesConf.krbConfigMapName)
+  .withItems(new KeyToPathBuilder()
+.withKey(krb5Conf.getName())
+.withPath(krb5Conf.getName())
+.build())
+  .endConfigMap()
+  .build()
+  }
+
+  val podWithVolume = new PodBuilder(pod.pod)
+.editSpec()
+  .addNewVolumeLike(configMapVolume)
+.endVolume()
+  .endSpec()
+.build()
+
+  val containerWithMount = new ContainerBuilder(pod.container)
+.addNewVolumeMount()
+  .withName(KRB_FILE_VOLUME)
+  .withMountPath(KRB_FILE_DIR_PATH + "/krb5.conf")
+  .withSubPath("krb5.conf")
+  .endVolumeMount()
+.build()
+
+  SparkPod(podWithVolume, containerWithMount)
+}.transform {
+  case pod if needKeytabUpload =>
+// If keytab is defined and is a submission-local file (not local: 
URI), then create a
+// secret for it. The keytab data will be stored in this secret 
below.
+val podWitKeytab = new PodBuilder(pod.pod)
+  .editOrNewSpec()
+

[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...

2018-11-14 Thread ifilonenko
Github user ifilonenko commented on a diff in the pull request:

https://github.com/apache/spark/pull/22911#discussion_r233660132
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
 ---
@@ -69,97 +72,191 @@ private[spark] class KerberosConfDriverFeatureStep(
 "If a Kerberos keytab is specified you must also specify a Kerberos 
principal")
 
   KubernetesUtils.requireBothOrNeitherDefined(
-existingSecretName,
-existingSecretItemKey,
+existingDtSecret,
+existingDtItemKey,
 "If a secret data item-key where the data of the Kerberos Delegation 
Token is specified" +
   " you must also specify the name of the secret",
 "If a secret storing a Kerberos Delegation Token is specified you must 
also" +
   " specify the item-key where the data is stored")
 
-  private val hadoopConfigurationFiles = 
hadoopConfDirSpec.hadoopConfDir.map { hConfDir =>
-HadoopBootstrapUtil.getHadoopConfFiles(hConfDir)
+  if (!hasKerberosConf) {
+logInfo("You have not specified a krb5.conf file locally or via a 
ConfigMap. " +
+  "Make sure that you have the krb5.conf locally on the driver image.")
   }
-  private val newHadoopConfigMapName =
-if (hadoopConfDirSpec.hadoopConfigMapName.isEmpty) {
-  Some(kubernetesConf.hadoopConfigMapName)
-} else {
-  None
-}
 
-  // Either use pre-existing secret or login to create new Secret with DT 
stored within
-  private val kerberosConfSpec: Option[KerberosConfigSpec] = (for {
-secretName <- existingSecretName
-secretItemKey <- existingSecretItemKey
-  } yield {
-KerberosConfigSpec(
-  dtSecret = None,
-  dtSecretName = secretName,
-  dtSecretItemKey = secretItemKey,
-  jobUserName = kubeTokenManager.getCurrentUser.getShortUserName)
-  }).orElse(
-if (isKerberosEnabled) {
-  Some(HadoopKerberosLogin.buildSpec(
-conf,
-kubernetesConf.appResourceNamePrefix,
-kubeTokenManager))
-} else {
-  None
+  // Create delegation tokens if needed. This is a lazy val so that it's 
not populated
+  // unnecessarily. But it needs to be accessible to different methods in 
this class,
+  // since it's not clear based solely on available configuration options 
that delegation
+  // tokens are needed when other credentials are not available.
+  private lazy val delegationTokens: Array[Byte] = if (keytab.isEmpty && 
existingDtSecret.isEmpty) {
+createDelegationTokens()
+  } else {
+null
+  }
+
+  private def needKeytabUpload: Boolean = 
keytab.exists(!Utils.isLocalUri(_))
+
+  private def dtSecretName: String = 
s"${kubernetesConf.appResourceNamePrefix}-delegation-tokens"
+
+  private def ktSecretName: String = 
s"${kubernetesConf.appResourceNamePrefix}-kerberos-keytab"
+
+  private def hasKerberosConf: Boolean = krb5CMap.isDefined | 
krb5File.isDefined
+
+  override def configurePod(original: SparkPod): SparkPod = {
+original.transform { case pod if hasKerberosConf =>
+  val configMapVolume = if (krb5CMap.isDefined) {
+new VolumeBuilder()
+  .withName(KRB_FILE_VOLUME)
+  .withNewConfigMap()
+.withName(krb5CMap.get)
+.endConfigMap()
+  .build()
+  } else {
+val krb5Conf = new File(krb5File.get)
+new VolumeBuilder()
+  .withName(KRB_FILE_VOLUME)
+  .withNewConfigMap()
+  .withName(kubernetesConf.krbConfigMapName)
+  .withItems(new KeyToPathBuilder()
+.withKey(krb5Conf.getName())
+.withPath(krb5Conf.getName())
+.build())
+  .endConfigMap()
+  .build()
+  }
+
+  val podWithVolume = new PodBuilder(pod.pod)
+.editSpec()
+  .addNewVolumeLike(configMapVolume)
+.endVolume()
+  .endSpec()
+.build()
+
+  val containerWithMount = new ContainerBuilder(pod.container)
+.addNewVolumeMount()
+  .withName(KRB_FILE_VOLUME)
+  .withMountPath(KRB_FILE_DIR_PATH + "/krb5.conf")
+  .withSubPath("krb5.conf")
+  .endVolumeMount()
+.build()
+
+  SparkPod(podWithVolume, containerWithMount)
+}.transform {
+  case pod if needKeytabUpload =>
+// If keytab is defined and is a submission-local file (not local: 
URI), then create a
+// secret for it. The keytab data will be stored in this secret 
below.
+val podWitKeytab = new PodBuilder(pod.pod)
+  .editOrNewSpec()
+

[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...

2018-11-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22911#discussion_r233658128
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
 ---
@@ -69,97 +72,191 @@ private[spark] class KerberosConfDriverFeatureStep(
 "If a Kerberos keytab is specified you must also specify a Kerberos 
principal")
 
   KubernetesUtils.requireBothOrNeitherDefined(
-existingSecretName,
-existingSecretItemKey,
+existingDtSecret,
+existingDtItemKey,
 "If a secret data item-key where the data of the Kerberos Delegation 
Token is specified" +
   " you must also specify the name of the secret",
 "If a secret storing a Kerberos Delegation Token is specified you must 
also" +
   " specify the item-key where the data is stored")
 
-  private val hadoopConfigurationFiles = 
hadoopConfDirSpec.hadoopConfDir.map { hConfDir =>
-HadoopBootstrapUtil.getHadoopConfFiles(hConfDir)
+  if (!hasKerberosConf) {
+logInfo("You have not specified a krb5.conf file locally or via a 
ConfigMap. " +
+  "Make sure that you have the krb5.conf locally on the driver image.")
   }
-  private val newHadoopConfigMapName =
-if (hadoopConfDirSpec.hadoopConfigMapName.isEmpty) {
-  Some(kubernetesConf.hadoopConfigMapName)
-} else {
-  None
-}
 
-  // Either use pre-existing secret or login to create new Secret with DT 
stored within
-  private val kerberosConfSpec: Option[KerberosConfigSpec] = (for {
-secretName <- existingSecretName
-secretItemKey <- existingSecretItemKey
-  } yield {
-KerberosConfigSpec(
-  dtSecret = None,
-  dtSecretName = secretName,
-  dtSecretItemKey = secretItemKey,
-  jobUserName = kubeTokenManager.getCurrentUser.getShortUserName)
-  }).orElse(
-if (isKerberosEnabled) {
-  Some(HadoopKerberosLogin.buildSpec(
-conf,
-kubernetesConf.appResourceNamePrefix,
-kubeTokenManager))
-} else {
-  None
+  // Create delegation tokens if needed. This is a lazy val so that it's 
not populated
+  // unnecessarily. But it needs to be accessible to different methods in 
this class,
+  // since it's not clear based solely on available configuration options 
that delegation
+  // tokens are needed when other credentials are not available.
+  private lazy val delegationTokens: Array[Byte] = if (keytab.isEmpty && 
existingDtSecret.isEmpty) {
+createDelegationTokens()
+  } else {
+null
+  }
+
+  private def needKeytabUpload: Boolean = 
keytab.exists(!Utils.isLocalUri(_))
+
+  private def dtSecretName: String = 
s"${kubernetesConf.appResourceNamePrefix}-delegation-tokens"
+
+  private def ktSecretName: String = 
s"${kubernetesConf.appResourceNamePrefix}-kerberos-keytab"
+
+  private def hasKerberosConf: Boolean = krb5CMap.isDefined | 
krb5File.isDefined
+
+  override def configurePod(original: SparkPod): SparkPod = {
+original.transform { case pod if hasKerberosConf =>
+  val configMapVolume = if (krb5CMap.isDefined) {
+new VolumeBuilder()
+  .withName(KRB_FILE_VOLUME)
+  .withNewConfigMap()
+.withName(krb5CMap.get)
+.endConfigMap()
+  .build()
+  } else {
+val krb5Conf = new File(krb5File.get)
+new VolumeBuilder()
+  .withName(KRB_FILE_VOLUME)
+  .withNewConfigMap()
+  .withName(kubernetesConf.krbConfigMapName)
+  .withItems(new KeyToPathBuilder()
+.withKey(krb5Conf.getName())
+.withPath(krb5Conf.getName())
+.build())
+  .endConfigMap()
+  .build()
+  }
+
+  val podWithVolume = new PodBuilder(pod.pod)
+.editSpec()
+  .addNewVolumeLike(configMapVolume)
+.endVolume()
+  .endSpec()
+.build()
+
+  val containerWithMount = new ContainerBuilder(pod.container)
+.addNewVolumeMount()
+  .withName(KRB_FILE_VOLUME)
+  .withMountPath(KRB_FILE_DIR_PATH + "/krb5.conf")
+  .withSubPath("krb5.conf")
+  .endVolumeMount()
+.build()
+
+  SparkPod(podWithVolume, containerWithMount)
+}.transform {
+  case pod if needKeytabUpload =>
+// If keytab is defined and is a submission-local file (not local: 
URI), then create a
+// secret for it. The keytab data will be stored in this secret 
below.
+val podWitKeytab = new PodBuilder(pod.pod)
+  .editOrNewSpec()
+

[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...

2018-11-14 Thread ifilonenko
Github user ifilonenko commented on a diff in the pull request:

https://github.com/apache/spark/pull/22911#discussion_r233657016
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
 ---
@@ -69,97 +72,191 @@ private[spark] class KerberosConfDriverFeatureStep(
 "If a Kerberos keytab is specified you must also specify a Kerberos 
principal")
 
   KubernetesUtils.requireBothOrNeitherDefined(
-existingSecretName,
-existingSecretItemKey,
+existingDtSecret,
+existingDtItemKey,
 "If a secret data item-key where the data of the Kerberos Delegation 
Token is specified" +
   " you must also specify the name of the secret",
 "If a secret storing a Kerberos Delegation Token is specified you must 
also" +
   " specify the item-key where the data is stored")
 
-  private val hadoopConfigurationFiles = 
hadoopConfDirSpec.hadoopConfDir.map { hConfDir =>
-HadoopBootstrapUtil.getHadoopConfFiles(hConfDir)
+  if (!hasKerberosConf) {
+logInfo("You have not specified a krb5.conf file locally or via a 
ConfigMap. " +
+  "Make sure that you have the krb5.conf locally on the driver image.")
   }
-  private val newHadoopConfigMapName =
-if (hadoopConfDirSpec.hadoopConfigMapName.isEmpty) {
-  Some(kubernetesConf.hadoopConfigMapName)
-} else {
-  None
-}
 
-  // Either use pre-existing secret or login to create new Secret with DT 
stored within
-  private val kerberosConfSpec: Option[KerberosConfigSpec] = (for {
-secretName <- existingSecretName
-secretItemKey <- existingSecretItemKey
-  } yield {
-KerberosConfigSpec(
-  dtSecret = None,
-  dtSecretName = secretName,
-  dtSecretItemKey = secretItemKey,
-  jobUserName = kubeTokenManager.getCurrentUser.getShortUserName)
-  }).orElse(
-if (isKerberosEnabled) {
-  Some(HadoopKerberosLogin.buildSpec(
-conf,
-kubernetesConf.appResourceNamePrefix,
-kubeTokenManager))
-} else {
-  None
+  // Create delegation tokens if needed. This is a lazy val so that it's 
not populated
+  // unnecessarily. But it needs to be accessible to different methods in 
this class,
+  // since it's not clear based solely on available configuration options 
that delegation
+  // tokens are needed when other credentials are not available.
+  private lazy val delegationTokens: Array[Byte] = if (keytab.isEmpty && 
existingDtSecret.isEmpty) {
+createDelegationTokens()
+  } else {
+null
+  }
+
+  private def needKeytabUpload: Boolean = 
keytab.exists(!Utils.isLocalUri(_))
+
+  private def dtSecretName: String = 
s"${kubernetesConf.appResourceNamePrefix}-delegation-tokens"
+
+  private def ktSecretName: String = 
s"${kubernetesConf.appResourceNamePrefix}-kerberos-keytab"
+
+  private def hasKerberosConf: Boolean = krb5CMap.isDefined | 
krb5File.isDefined
+
+  override def configurePod(original: SparkPod): SparkPod = {
+original.transform { case pod if hasKerberosConf =>
+  val configMapVolume = if (krb5CMap.isDefined) {
+new VolumeBuilder()
+  .withName(KRB_FILE_VOLUME)
+  .withNewConfigMap()
+.withName(krb5CMap.get)
+.endConfigMap()
+  .build()
+  } else {
+val krb5Conf = new File(krb5File.get)
+new VolumeBuilder()
+  .withName(KRB_FILE_VOLUME)
+  .withNewConfigMap()
+  .withName(kubernetesConf.krbConfigMapName)
+  .withItems(new KeyToPathBuilder()
+.withKey(krb5Conf.getName())
+.withPath(krb5Conf.getName())
+.build())
+  .endConfigMap()
+  .build()
+  }
+
+  val podWithVolume = new PodBuilder(pod.pod)
+.editSpec()
+  .addNewVolumeLike(configMapVolume)
+.endVolume()
+  .endSpec()
+.build()
+
+  val containerWithMount = new ContainerBuilder(pod.container)
+.addNewVolumeMount()
+  .withName(KRB_FILE_VOLUME)
+  .withMountPath(KRB_FILE_DIR_PATH + "/krb5.conf")
+  .withSubPath("krb5.conf")
+  .endVolumeMount()
+.build()
+
+  SparkPod(podWithVolume, containerWithMount)
+}.transform {
+  case pod if needKeytabUpload =>
+// If keytab is defined and is a submission-local file (not local: 
URI), then create a
+// secret for it. The keytab data will be stored in this secret 
below.
+val podWitKeytab = new PodBuilder(pod.pod)
+  .editOrNewSpec()
+

[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...

2018-11-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22911#discussion_r233654819
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
 ---
@@ -69,97 +72,191 @@ private[spark] class KerberosConfDriverFeatureStep(
 "If a Kerberos keytab is specified you must also specify a Kerberos 
principal")
 
   KubernetesUtils.requireBothOrNeitherDefined(
-existingSecretName,
-existingSecretItemKey,
+existingDtSecret,
+existingDtItemKey,
 "If a secret data item-key where the data of the Kerberos Delegation 
Token is specified" +
   " you must also specify the name of the secret",
 "If a secret storing a Kerberos Delegation Token is specified you must 
also" +
   " specify the item-key where the data is stored")
 
-  private val hadoopConfigurationFiles = 
hadoopConfDirSpec.hadoopConfDir.map { hConfDir =>
-HadoopBootstrapUtil.getHadoopConfFiles(hConfDir)
+  if (!hasKerberosConf) {
+logInfo("You have not specified a krb5.conf file locally or via a 
ConfigMap. " +
+  "Make sure that you have the krb5.conf locally on the driver image.")
   }
-  private val newHadoopConfigMapName =
-if (hadoopConfDirSpec.hadoopConfigMapName.isEmpty) {
-  Some(kubernetesConf.hadoopConfigMapName)
-} else {
-  None
-}
 
-  // Either use pre-existing secret or login to create new Secret with DT 
stored within
-  private val kerberosConfSpec: Option[KerberosConfigSpec] = (for {
-secretName <- existingSecretName
-secretItemKey <- existingSecretItemKey
-  } yield {
-KerberosConfigSpec(
-  dtSecret = None,
-  dtSecretName = secretName,
-  dtSecretItemKey = secretItemKey,
-  jobUserName = kubeTokenManager.getCurrentUser.getShortUserName)
-  }).orElse(
-if (isKerberosEnabled) {
-  Some(HadoopKerberosLogin.buildSpec(
-conf,
-kubernetesConf.appResourceNamePrefix,
-kubeTokenManager))
-} else {
-  None
+  // Create delegation tokens if needed. This is a lazy val so that it's 
not populated
+  // unnecessarily. But it needs to be accessible to different methods in 
this class,
+  // since it's not clear based solely on available configuration options 
that delegation
+  // tokens are needed when other credentials are not available.
+  private lazy val delegationTokens: Array[Byte] = if (keytab.isEmpty && 
existingDtSecret.isEmpty) {
+createDelegationTokens()
+  } else {
+null
+  }
+
+  private def needKeytabUpload: Boolean = 
keytab.exists(!Utils.isLocalUri(_))
+
+  private def dtSecretName: String = 
s"${kubernetesConf.appResourceNamePrefix}-delegation-tokens"
+
+  private def ktSecretName: String = 
s"${kubernetesConf.appResourceNamePrefix}-kerberos-keytab"
+
+  private def hasKerberosConf: Boolean = krb5CMap.isDefined | 
krb5File.isDefined
+
+  override def configurePod(original: SparkPod): SparkPod = {
+original.transform { case pod if hasKerberosConf =>
+  val configMapVolume = if (krb5CMap.isDefined) {
+new VolumeBuilder()
+  .withName(KRB_FILE_VOLUME)
+  .withNewConfigMap()
+.withName(krb5CMap.get)
+.endConfigMap()
+  .build()
+  } else {
+val krb5Conf = new File(krb5File.get)
+new VolumeBuilder()
+  .withName(KRB_FILE_VOLUME)
+  .withNewConfigMap()
+  .withName(kubernetesConf.krbConfigMapName)
+  .withItems(new KeyToPathBuilder()
+.withKey(krb5Conf.getName())
+.withPath(krb5Conf.getName())
+.build())
+  .endConfigMap()
+  .build()
+  }
+
+  val podWithVolume = new PodBuilder(pod.pod)
+.editSpec()
+  .addNewVolumeLike(configMapVolume)
+.endVolume()
+  .endSpec()
+.build()
+
+  val containerWithMount = new ContainerBuilder(pod.container)
+.addNewVolumeMount()
+  .withName(KRB_FILE_VOLUME)
+  .withMountPath(KRB_FILE_DIR_PATH + "/krb5.conf")
+  .withSubPath("krb5.conf")
+  .endVolumeMount()
+.build()
+
+  SparkPod(podWithVolume, containerWithMount)
+}.transform {
+  case pod if needKeytabUpload =>
+// If keytab is defined and is a submission-local file (not local: 
URI), then create a
+// secret for it. The keytab data will be stored in this secret 
below.
+val podWitKeytab = new PodBuilder(pod.pod)
+  .editOrNewSpec()
+

[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...

2018-11-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22911#discussion_r233653814
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
 ---
@@ -69,97 +72,191 @@ private[spark] class KerberosConfDriverFeatureStep(
 "If a Kerberos keytab is specified you must also specify a Kerberos 
principal")
 
   KubernetesUtils.requireBothOrNeitherDefined(
-existingSecretName,
-existingSecretItemKey,
+existingDtSecret,
+existingDtItemKey,
 "If a secret data item-key where the data of the Kerberos Delegation 
Token is specified" +
   " you must also specify the name of the secret",
 "If a secret storing a Kerberos Delegation Token is specified you must 
also" +
   " specify the item-key where the data is stored")
 
-  private val hadoopConfigurationFiles = 
hadoopConfDirSpec.hadoopConfDir.map { hConfDir =>
-HadoopBootstrapUtil.getHadoopConfFiles(hConfDir)
+  if (!hasKerberosConf) {
+logInfo("You have not specified a krb5.conf file locally or via a 
ConfigMap. " +
+  "Make sure that you have the krb5.conf locally on the driver image.")
   }
-  private val newHadoopConfigMapName =
-if (hadoopConfDirSpec.hadoopConfigMapName.isEmpty) {
-  Some(kubernetesConf.hadoopConfigMapName)
-} else {
-  None
-}
 
-  // Either use pre-existing secret or login to create new Secret with DT 
stored within
-  private val kerberosConfSpec: Option[KerberosConfigSpec] = (for {
-secretName <- existingSecretName
-secretItemKey <- existingSecretItemKey
-  } yield {
-KerberosConfigSpec(
-  dtSecret = None,
-  dtSecretName = secretName,
-  dtSecretItemKey = secretItemKey,
-  jobUserName = kubeTokenManager.getCurrentUser.getShortUserName)
-  }).orElse(
-if (isKerberosEnabled) {
-  Some(HadoopKerberosLogin.buildSpec(
-conf,
-kubernetesConf.appResourceNamePrefix,
-kubeTokenManager))
-} else {
-  None
+  // Create delegation tokens if needed. This is a lazy val so that it's 
not populated
+  // unnecessarily. But it needs to be accessible to different methods in 
this class,
+  // since it's not clear based solely on available configuration options 
that delegation
+  // tokens are needed when other credentials are not available.
+  private lazy val delegationTokens: Array[Byte] = if (keytab.isEmpty && 
existingDtSecret.isEmpty) {
+createDelegationTokens()
+  } else {
+null
+  }
+
+  private def needKeytabUpload: Boolean = 
keytab.exists(!Utils.isLocalUri(_))
+
+  private def dtSecretName: String = 
s"${kubernetesConf.appResourceNamePrefix}-delegation-tokens"
+
+  private def ktSecretName: String = 
s"${kubernetesConf.appResourceNamePrefix}-kerberos-keytab"
+
+  private def hasKerberosConf: Boolean = krb5CMap.isDefined | 
krb5File.isDefined
+
+  override def configurePod(original: SparkPod): SparkPod = {
+original.transform { case pod if hasKerberosConf =>
+  val configMapVolume = if (krb5CMap.isDefined) {
+new VolumeBuilder()
+  .withName(KRB_FILE_VOLUME)
+  .withNewConfigMap()
+.withName(krb5CMap.get)
+.endConfigMap()
+  .build()
+  } else {
+val krb5Conf = new File(krb5File.get)
+new VolumeBuilder()
+  .withName(KRB_FILE_VOLUME)
+  .withNewConfigMap()
+  .withName(kubernetesConf.krbConfigMapName)
+  .withItems(new KeyToPathBuilder()
+.withKey(krb5Conf.getName())
+.withPath(krb5Conf.getName())
+.build())
+  .endConfigMap()
+  .build()
+  }
+
+  val podWithVolume = new PodBuilder(pod.pod)
+.editSpec()
+  .addNewVolumeLike(configMapVolume)
+.endVolume()
+  .endSpec()
+.build()
+
+  val containerWithMount = new ContainerBuilder(pod.container)
+.addNewVolumeMount()
+  .withName(KRB_FILE_VOLUME)
+  .withMountPath(KRB_FILE_DIR_PATH + "/krb5.conf")
+  .withSubPath("krb5.conf")
+  .endVolumeMount()
+.build()
+
+  SparkPod(podWithVolume, containerWithMount)
+}.transform {
+  case pod if needKeytabUpload =>
+// If keytab is defined and is a submission-local file (not local: 
URI), then create a
+// secret for it. The keytab data will be stored in this secret 
below.
+val podWitKeytab = new PodBuilder(pod.pod)
+  .editOrNewSpec()
+

[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...

2018-11-14 Thread ifilonenko
Github user ifilonenko commented on a diff in the pull request:

https://github.com/apache/spark/pull/22911#discussion_r233652949
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
 ---
@@ -69,97 +72,191 @@ private[spark] class KerberosConfDriverFeatureStep(
 "If a Kerberos keytab is specified you must also specify a Kerberos 
principal")
 
   KubernetesUtils.requireBothOrNeitherDefined(
-existingSecretName,
-existingSecretItemKey,
+existingDtSecret,
+existingDtItemKey,
 "If a secret data item-key where the data of the Kerberos Delegation 
Token is specified" +
   " you must also specify the name of the secret",
 "If a secret storing a Kerberos Delegation Token is specified you must 
also" +
   " specify the item-key where the data is stored")
 
-  private val hadoopConfigurationFiles = 
hadoopConfDirSpec.hadoopConfDir.map { hConfDir =>
-HadoopBootstrapUtil.getHadoopConfFiles(hConfDir)
+  if (!hasKerberosConf) {
+logInfo("You have not specified a krb5.conf file locally or via a 
ConfigMap. " +
+  "Make sure that you have the krb5.conf locally on the driver image.")
   }
-  private val newHadoopConfigMapName =
-if (hadoopConfDirSpec.hadoopConfigMapName.isEmpty) {
-  Some(kubernetesConf.hadoopConfigMapName)
-} else {
-  None
-}
 
-  // Either use pre-existing secret or login to create new Secret with DT 
stored within
-  private val kerberosConfSpec: Option[KerberosConfigSpec] = (for {
-secretName <- existingSecretName
-secretItemKey <- existingSecretItemKey
-  } yield {
-KerberosConfigSpec(
-  dtSecret = None,
-  dtSecretName = secretName,
-  dtSecretItemKey = secretItemKey,
-  jobUserName = kubeTokenManager.getCurrentUser.getShortUserName)
-  }).orElse(
-if (isKerberosEnabled) {
-  Some(HadoopKerberosLogin.buildSpec(
-conf,
-kubernetesConf.appResourceNamePrefix,
-kubeTokenManager))
-} else {
-  None
+  // Create delegation tokens if needed. This is a lazy val so that it's 
not populated
+  // unnecessarily. But it needs to be accessible to different methods in 
this class,
+  // since it's not clear based solely on available configuration options 
that delegation
+  // tokens are needed when other credentials are not available.
+  private lazy val delegationTokens: Array[Byte] = if (keytab.isEmpty && 
existingDtSecret.isEmpty) {
+createDelegationTokens()
+  } else {
+null
+  }
+
+  private def needKeytabUpload: Boolean = 
keytab.exists(!Utils.isLocalUri(_))
+
+  private def dtSecretName: String = 
s"${kubernetesConf.appResourceNamePrefix}-delegation-tokens"
+
+  private def ktSecretName: String = 
s"${kubernetesConf.appResourceNamePrefix}-kerberos-keytab"
+
+  private def hasKerberosConf: Boolean = krb5CMap.isDefined | 
krb5File.isDefined
+
+  override def configurePod(original: SparkPod): SparkPod = {
+original.transform { case pod if hasKerberosConf =>
+  val configMapVolume = if (krb5CMap.isDefined) {
+new VolumeBuilder()
+  .withName(KRB_FILE_VOLUME)
+  .withNewConfigMap()
+.withName(krb5CMap.get)
+.endConfigMap()
+  .build()
+  } else {
+val krb5Conf = new File(krb5File.get)
+new VolumeBuilder()
+  .withName(KRB_FILE_VOLUME)
+  .withNewConfigMap()
+  .withName(kubernetesConf.krbConfigMapName)
+  .withItems(new KeyToPathBuilder()
+.withKey(krb5Conf.getName())
+.withPath(krb5Conf.getName())
+.build())
+  .endConfigMap()
+  .build()
+  }
+
+  val podWithVolume = new PodBuilder(pod.pod)
+.editSpec()
+  .addNewVolumeLike(configMapVolume)
+.endVolume()
+  .endSpec()
+.build()
+
+  val containerWithMount = new ContainerBuilder(pod.container)
+.addNewVolumeMount()
+  .withName(KRB_FILE_VOLUME)
+  .withMountPath(KRB_FILE_DIR_PATH + "/krb5.conf")
+  .withSubPath("krb5.conf")
+  .endVolumeMount()
+.build()
+
+  SparkPod(podWithVolume, containerWithMount)
+}.transform {
+  case pod if needKeytabUpload =>
+// If keytab is defined and is a submission-local file (not local: 
URI), then create a
+// secret for it. The keytab data will be stored in this secret 
below.
+val podWitKeytab = new PodBuilder(pod.pod)
+  .editOrNewSpec()
+

[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...

2018-11-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22911#discussion_r233651011
  
--- Diff: 
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStepSuite.scala
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+import io.fabric8.kubernetes.api.model.{ConfigMap, Secret}
+import org.mockito.Mockito._
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s._
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
+import org.apache.spark.internal.config._
+import org.apache.spark.util.Utils
+
+class KerberosConfDriverFeatureStepSuite extends SparkFunSuite {
+
+  import KubernetesFeaturesTestUtils._
+  import SecretVolumeUtils._
+
+  private val tmpDir = Utils.createTempDir()
+
+  test("mount krb5 config map if defined") {
+val configMap = "testConfigMap"
+val step = createStep(
+  new SparkConf(false).set(KUBERNETES_KERBEROS_KRB5_CONFIG_MAP, 
configMap))
+
+checkPodForKrbConf(step.configurePod(SparkPod.initialPod()), configMap)
+assert(step.getAdditionalPodSystemProperties().isEmpty)
+
assert(filter[ConfigMap](step.getAdditionalKubernetesResources()).isEmpty)
+  }
+
+  test("create krb5.conf config map if local config provided") {
+val krbConf = File.createTempFile("krb5", ".conf", tmpDir)
+Files.write("some data", krbConf, UTF_8)
+
+val sparkConf = new SparkConf(false)
+  .set(KUBERNETES_KERBEROS_KRB5_FILE, krbConf.getAbsolutePath())
+val step = createStep(sparkConf)
+
+val confMap = 
filter[ConfigMap](step.getAdditionalKubernetesResources()).head
+assert(confMap.getData().keySet().asScala === Set(krbConf.getName()))
+
+checkPodForKrbConf(step.configurePod(SparkPod.initialPod()), 
confMap.getMetadata().getName())
+assert(step.getAdditionalPodSystemProperties().isEmpty)
+  }
+
+  test("create keytab secret if client keytab file used") {
+val keytab = File.createTempFile("keytab", ".bin", tmpDir)
+Files.write("some data", keytab, UTF_8)
+
+val sparkConf = new SparkConf(false)
+  .set(KEYTAB, keytab.getAbsolutePath())
+  .set(PRINCIPAL, "alice")
+val step = createStep(sparkConf)
+
+val pod = step.configurePod(SparkPod.initialPod())
+assert(podHasVolume(pod.pod, KERBEROS_KEYTAB_VOLUME))
+assert(containerHasVolume(pod.container, KERBEROS_KEYTAB_VOLUME, 
KERBEROS_KEYTAB_MOUNT_POINT))
+
+assert(step.getAdditionalPodSystemProperties().keys === 
Set(KEYTAB.key))
+
+val secret = 
filter[Secret](step.getAdditionalKubernetesResources()).head
+assert(secret.getData().keySet().asScala === Set(keytab.getName()))
+  }
+
+  test("do nothing if container-local keytab used") {
+val sparkConf = new SparkConf(false)
+  .set(KEYTAB, "local:/my.keytab")
+  .set(PRINCIPAL, "alice")
+val step = createStep(sparkConf)
+
+val initial = SparkPod.initialPod()
+assert(step.configurePod(initial) === initial)
+assert(step.getAdditionalPodSystemProperties().isEmpty)
+assert(step.getAdditionalKubernetesResources().isEmpty)
+  }
+
+  test("mount delegation tokens if provided") {
+val dtSecret = "tokenSecret"
+val sparkConf = new SparkConf(false)
+  .set(KUBERNETES_KERBEROS_DT_SECRET_NAME, dtSecret)
+  .set(KUBERNETES_KERBEROS_DT_SECRET_ITEM_KEY, "dtokens")
+val step = createStep(sparkConf)
+
+

[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...

2018-11-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22911#discussion_r233650735
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
 ---
@@ -81,14 +81,14 @@ private[spark] class KubernetesDriverBuilder(
 
 val driverCommandStep = provideDriverCommandStep(kubernetesConf)
 
-val maybeHadoopConfigStep =
-  kubernetesConf.hadoopConfSpec.map { _ =>
-provideHadoopGlobalStep(kubernetesConf)}
+val otherSteps = Seq(
--- End diff --

All this code will go away when I get to SPARK-25877, so I'm not super 
concerned with the names here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...

2018-11-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22911#discussion_r233650398
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
 ---
@@ -69,97 +72,191 @@ private[spark] class KerberosConfDriverFeatureStep(
 "If a Kerberos keytab is specified you must also specify a Kerberos 
principal")
 
   KubernetesUtils.requireBothOrNeitherDefined(
-existingSecretName,
-existingSecretItemKey,
+existingDtSecret,
+existingDtItemKey,
 "If a secret data item-key where the data of the Kerberos Delegation 
Token is specified" +
   " you must also specify the name of the secret",
 "If a secret storing a Kerberos Delegation Token is specified you must 
also" +
   " specify the item-key where the data is stored")
 
-  private val hadoopConfigurationFiles = 
hadoopConfDirSpec.hadoopConfDir.map { hConfDir =>
-HadoopBootstrapUtil.getHadoopConfFiles(hConfDir)
+  if (!hasKerberosConf) {
+logInfo("You have not specified a krb5.conf file locally or via a 
ConfigMap. " +
+  "Make sure that you have the krb5.conf locally on the driver image.")
   }
-  private val newHadoopConfigMapName =
-if (hadoopConfDirSpec.hadoopConfigMapName.isEmpty) {
-  Some(kubernetesConf.hadoopConfigMapName)
-} else {
-  None
-}
 
-  // Either use pre-existing secret or login to create new Secret with DT 
stored within
-  private val kerberosConfSpec: Option[KerberosConfigSpec] = (for {
-secretName <- existingSecretName
-secretItemKey <- existingSecretItemKey
-  } yield {
-KerberosConfigSpec(
-  dtSecret = None,
-  dtSecretName = secretName,
-  dtSecretItemKey = secretItemKey,
-  jobUserName = kubeTokenManager.getCurrentUser.getShortUserName)
-  }).orElse(
-if (isKerberosEnabled) {
-  Some(HadoopKerberosLogin.buildSpec(
-conf,
-kubernetesConf.appResourceNamePrefix,
-kubeTokenManager))
-} else {
-  None
+  // Create delegation tokens if needed. This is a lazy val so that it's 
not populated
+  // unnecessarily. But it needs to be accessible to different methods in 
this class,
+  // since it's not clear based solely on available configuration options 
that delegation
+  // tokens are needed when other credentials are not available.
+  private lazy val delegationTokens: Array[Byte] = if (keytab.isEmpty && 
existingDtSecret.isEmpty) {
+createDelegationTokens()
+  } else {
+null
+  }
+
+  private def needKeytabUpload: Boolean = 
keytab.exists(!Utils.isLocalUri(_))
+
+  private def dtSecretName: String = 
s"${kubernetesConf.appResourceNamePrefix}-delegation-tokens"
+
+  private def ktSecretName: String = 
s"${kubernetesConf.appResourceNamePrefix}-kerberos-keytab"
+
+  private def hasKerberosConf: Boolean = krb5CMap.isDefined | 
krb5File.isDefined
+
+  override def configurePod(original: SparkPod): SparkPod = {
+original.transform { case pod if hasKerberosConf =>
+  val configMapVolume = if (krb5CMap.isDefined) {
+new VolumeBuilder()
+  .withName(KRB_FILE_VOLUME)
+  .withNewConfigMap()
+.withName(krb5CMap.get)
+.endConfigMap()
+  .build()
+  } else {
+val krb5Conf = new File(krb5File.get)
+new VolumeBuilder()
+  .withName(KRB_FILE_VOLUME)
+  .withNewConfigMap()
+  .withName(kubernetesConf.krbConfigMapName)
+  .withItems(new KeyToPathBuilder()
+.withKey(krb5Conf.getName())
+.withPath(krb5Conf.getName())
+.build())
+  .endConfigMap()
+  .build()
+  }
+
+  val podWithVolume = new PodBuilder(pod.pod)
+.editSpec()
+  .addNewVolumeLike(configMapVolume)
+.endVolume()
+  .endSpec()
+.build()
+
+  val containerWithMount = new ContainerBuilder(pod.container)
+.addNewVolumeMount()
+  .withName(KRB_FILE_VOLUME)
+  .withMountPath(KRB_FILE_DIR_PATH + "/krb5.conf")
+  .withSubPath("krb5.conf")
+  .endVolumeMount()
+.build()
+
+  SparkPod(podWithVolume, containerWithMount)
+}.transform {
+  case pod if needKeytabUpload =>
+// If keytab is defined and is a submission-local file (not local: 
URI), then create a
+// secret for it. The keytab data will be stored in this secret 
below.
+val podWitKeytab = new PodBuilder(pod.pod)
+  .editOrNewSpec()
+

[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...

2018-11-14 Thread ifilonenko
Github user ifilonenko commented on a diff in the pull request:

https://github.com/apache/spark/pull/22911#discussion_r233650115
  
--- Diff: 
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStepSuite.scala
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+import io.fabric8.kubernetes.api.model.{ConfigMap, Secret}
+import org.mockito.Mockito._
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s._
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
+import org.apache.spark.internal.config._
+import org.apache.spark.util.Utils
+
+class KerberosConfDriverFeatureStepSuite extends SparkFunSuite {
+
+  import KubernetesFeaturesTestUtils._
+  import SecretVolumeUtils._
+
+  private val tmpDir = Utils.createTempDir()
+
+  test("mount krb5 config map if defined") {
+val configMap = "testConfigMap"
+val step = createStep(
+  new SparkConf(false).set(KUBERNETES_KERBEROS_KRB5_CONFIG_MAP, 
configMap))
+
+checkPodForKrbConf(step.configurePod(SparkPod.initialPod()), configMap)
+assert(step.getAdditionalPodSystemProperties().isEmpty)
+
assert(filter[ConfigMap](step.getAdditionalKubernetesResources()).isEmpty)
+  }
+
+  test("create krb5.conf config map if local config provided") {
+val krbConf = File.createTempFile("krb5", ".conf", tmpDir)
+Files.write("some data", krbConf, UTF_8)
+
+val sparkConf = new SparkConf(false)
+  .set(KUBERNETES_KERBEROS_KRB5_FILE, krbConf.getAbsolutePath())
+val step = createStep(sparkConf)
+
+val confMap = 
filter[ConfigMap](step.getAdditionalKubernetesResources()).head
+assert(confMap.getData().keySet().asScala === Set(krbConf.getName()))
+
+checkPodForKrbConf(step.configurePod(SparkPod.initialPod()), 
confMap.getMetadata().getName())
+assert(step.getAdditionalPodSystemProperties().isEmpty)
+  }
+
+  test("create keytab secret if client keytab file used") {
+val keytab = File.createTempFile("keytab", ".bin", tmpDir)
+Files.write("some data", keytab, UTF_8)
+
+val sparkConf = new SparkConf(false)
+  .set(KEYTAB, keytab.getAbsolutePath())
+  .set(PRINCIPAL, "alice")
+val step = createStep(sparkConf)
+
+val pod = step.configurePod(SparkPod.initialPod())
+assert(podHasVolume(pod.pod, KERBEROS_KEYTAB_VOLUME))
+assert(containerHasVolume(pod.container, KERBEROS_KEYTAB_VOLUME, 
KERBEROS_KEYTAB_MOUNT_POINT))
+
+assert(step.getAdditionalPodSystemProperties().keys === 
Set(KEYTAB.key))
+
+val secret = 
filter[Secret](step.getAdditionalKubernetesResources()).head
+assert(secret.getData().keySet().asScala === Set(keytab.getName()))
+  }
+
+  test("do nothing if container-local keytab used") {
+val sparkConf = new SparkConf(false)
+  .set(KEYTAB, "local:/my.keytab")
+  .set(PRINCIPAL, "alice")
+val step = createStep(sparkConf)
+
+val initial = SparkPod.initialPod()
+assert(step.configurePod(initial) === initial)
+assert(step.getAdditionalPodSystemProperties().isEmpty)
+assert(step.getAdditionalKubernetesResources().isEmpty)
+  }
+
+  test("mount delegation tokens if provided") {
+val dtSecret = "tokenSecret"
+val sparkConf = new SparkConf(false)
+  .set(KUBERNETES_KERBEROS_DT_SECRET_NAME, dtSecret)
+  .set(KUBERNETES_KERBEROS_DT_SECRET_ITEM_KEY, "dtokens")
+val step = createStep(sparkConf)
+
+

[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...

2018-11-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22911#discussion_r233649612
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
 ---
@@ -69,97 +72,191 @@ private[spark] class KerberosConfDriverFeatureStep(
 "If a Kerberos keytab is specified you must also specify a Kerberos 
principal")
 
   KubernetesUtils.requireBothOrNeitherDefined(
-existingSecretName,
-existingSecretItemKey,
+existingDtSecret,
+existingDtItemKey,
 "If a secret data item-key where the data of the Kerberos Delegation 
Token is specified" +
   " you must also specify the name of the secret",
 "If a secret storing a Kerberos Delegation Token is specified you must 
also" +
   " specify the item-key where the data is stored")
 
-  private val hadoopConfigurationFiles = 
hadoopConfDirSpec.hadoopConfDir.map { hConfDir =>
-HadoopBootstrapUtil.getHadoopConfFiles(hConfDir)
+  if (!hasKerberosConf) {
+logInfo("You have not specified a krb5.conf file locally or via a 
ConfigMap. " +
+  "Make sure that you have the krb5.conf locally on the driver image.")
   }
-  private val newHadoopConfigMapName =
-if (hadoopConfDirSpec.hadoopConfigMapName.isEmpty) {
-  Some(kubernetesConf.hadoopConfigMapName)
-} else {
-  None
-}
 
-  // Either use pre-existing secret or login to create new Secret with DT 
stored within
-  private val kerberosConfSpec: Option[KerberosConfigSpec] = (for {
-secretName <- existingSecretName
-secretItemKey <- existingSecretItemKey
-  } yield {
-KerberosConfigSpec(
-  dtSecret = None,
-  dtSecretName = secretName,
-  dtSecretItemKey = secretItemKey,
-  jobUserName = kubeTokenManager.getCurrentUser.getShortUserName)
-  }).orElse(
-if (isKerberosEnabled) {
-  Some(HadoopKerberosLogin.buildSpec(
-conf,
-kubernetesConf.appResourceNamePrefix,
-kubeTokenManager))
-} else {
-  None
+  // Create delegation tokens if needed. This is a lazy val so that it's 
not populated
+  // unnecessarily. But it needs to be accessible to different methods in 
this class,
+  // since it's not clear based solely on available configuration options 
that delegation
+  // tokens are needed when other credentials are not available.
+  private lazy val delegationTokens: Array[Byte] = if (keytab.isEmpty && 
existingDtSecret.isEmpty) {
+createDelegationTokens()
+  } else {
+null
+  }
+
+  private def needKeytabUpload: Boolean = 
keytab.exists(!Utils.isLocalUri(_))
+
+  private def dtSecretName: String = 
s"${kubernetesConf.appResourceNamePrefix}-delegation-tokens"
+
+  private def ktSecretName: String = 
s"${kubernetesConf.appResourceNamePrefix}-kerberos-keytab"
+
+  private def hasKerberosConf: Boolean = krb5CMap.isDefined | 
krb5File.isDefined
+
+  override def configurePod(original: SparkPod): SparkPod = {
+original.transform { case pod if hasKerberosConf =>
+  val configMapVolume = if (krb5CMap.isDefined) {
+new VolumeBuilder()
+  .withName(KRB_FILE_VOLUME)
+  .withNewConfigMap()
+.withName(krb5CMap.get)
+.endConfigMap()
+  .build()
+  } else {
+val krb5Conf = new File(krb5File.get)
+new VolumeBuilder()
+  .withName(KRB_FILE_VOLUME)
+  .withNewConfigMap()
+  .withName(kubernetesConf.krbConfigMapName)
+  .withItems(new KeyToPathBuilder()
+.withKey(krb5Conf.getName())
+.withPath(krb5Conf.getName())
+.build())
+  .endConfigMap()
+  .build()
+  }
+
+  val podWithVolume = new PodBuilder(pod.pod)
+.editSpec()
+  .addNewVolumeLike(configMapVolume)
+.endVolume()
+  .endSpec()
+.build()
+
+  val containerWithMount = new ContainerBuilder(pod.container)
+.addNewVolumeMount()
+  .withName(KRB_FILE_VOLUME)
+  .withMountPath(KRB_FILE_DIR_PATH + "/krb5.conf")
+  .withSubPath("krb5.conf")
+  .endVolumeMount()
+.build()
+
+  SparkPod(podWithVolume, containerWithMount)
+}.transform {
+  case pod if needKeytabUpload =>
+// If keytab is defined and is a submission-local file (not local: 
URI), then create a
+// secret for it. The keytab data will be stored in this secret 
below.
+val podWitKeytab = new PodBuilder(pod.pod)
+  .editOrNewSpec()
+

[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...

2018-11-14 Thread ifilonenko
Github user ifilonenko commented on a diff in the pull request:

https://github.com/apache/spark/pull/22911#discussion_r233649498
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
 ---
@@ -81,14 +81,14 @@ private[spark] class KubernetesDriverBuilder(
 
 val driverCommandStep = provideDriverCommandStep(kubernetesConf)
 
-val maybeHadoopConfigStep =
-  kubernetesConf.hadoopConfSpec.map { _ =>
-provideHadoopGlobalStep(kubernetesConf)}
+val otherSteps = Seq(
--- End diff --

nit: `otherStep` isn't very descriptive to the contents of the steps. I 
prefer `hadoopSteps` if anything


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...

2018-11-14 Thread ifilonenko
Github user ifilonenko commented on a diff in the pull request:

https://github.com/apache/spark/pull/22911#discussion_r233649110
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
 ---
@@ -69,97 +72,191 @@ private[spark] class KerberosConfDriverFeatureStep(
 "If a Kerberos keytab is specified you must also specify a Kerberos 
principal")
 
   KubernetesUtils.requireBothOrNeitherDefined(
-existingSecretName,
-existingSecretItemKey,
+existingDtSecret,
+existingDtItemKey,
 "If a secret data item-key where the data of the Kerberos Delegation 
Token is specified" +
   " you must also specify the name of the secret",
 "If a secret storing a Kerberos Delegation Token is specified you must 
also" +
   " specify the item-key where the data is stored")
 
-  private val hadoopConfigurationFiles = 
hadoopConfDirSpec.hadoopConfDir.map { hConfDir =>
-HadoopBootstrapUtil.getHadoopConfFiles(hConfDir)
+  if (!hasKerberosConf) {
+logInfo("You have not specified a krb5.conf file locally or via a 
ConfigMap. " +
+  "Make sure that you have the krb5.conf locally on the driver image.")
   }
-  private val newHadoopConfigMapName =
-if (hadoopConfDirSpec.hadoopConfigMapName.isEmpty) {
-  Some(kubernetesConf.hadoopConfigMapName)
-} else {
-  None
-}
 
-  // Either use pre-existing secret or login to create new Secret with DT 
stored within
-  private val kerberosConfSpec: Option[KerberosConfigSpec] = (for {
-secretName <- existingSecretName
-secretItemKey <- existingSecretItemKey
-  } yield {
-KerberosConfigSpec(
-  dtSecret = None,
-  dtSecretName = secretName,
-  dtSecretItemKey = secretItemKey,
-  jobUserName = kubeTokenManager.getCurrentUser.getShortUserName)
-  }).orElse(
-if (isKerberosEnabled) {
-  Some(HadoopKerberosLogin.buildSpec(
-conf,
-kubernetesConf.appResourceNamePrefix,
-kubeTokenManager))
-} else {
-  None
+  // Create delegation tokens if needed. This is a lazy val so that it's 
not populated
+  // unnecessarily. But it needs to be accessible to different methods in 
this class,
+  // since it's not clear based solely on available configuration options 
that delegation
+  // tokens are needed when other credentials are not available.
+  private lazy val delegationTokens: Array[Byte] = if (keytab.isEmpty && 
existingDtSecret.isEmpty) {
+createDelegationTokens()
+  } else {
+null
+  }
+
+  private def needKeytabUpload: Boolean = 
keytab.exists(!Utils.isLocalUri(_))
+
+  private def dtSecretName: String = 
s"${kubernetesConf.appResourceNamePrefix}-delegation-tokens"
+
+  private def ktSecretName: String = 
s"${kubernetesConf.appResourceNamePrefix}-kerberos-keytab"
+
+  private def hasKerberosConf: Boolean = krb5CMap.isDefined | 
krb5File.isDefined
+
+  override def configurePod(original: SparkPod): SparkPod = {
+original.transform { case pod if hasKerberosConf =>
+  val configMapVolume = if (krb5CMap.isDefined) {
+new VolumeBuilder()
+  .withName(KRB_FILE_VOLUME)
+  .withNewConfigMap()
+.withName(krb5CMap.get)
+.endConfigMap()
+  .build()
+  } else {
+val krb5Conf = new File(krb5File.get)
+new VolumeBuilder()
+  .withName(KRB_FILE_VOLUME)
+  .withNewConfigMap()
+  .withName(kubernetesConf.krbConfigMapName)
+  .withItems(new KeyToPathBuilder()
+.withKey(krb5Conf.getName())
+.withPath(krb5Conf.getName())
+.build())
+  .endConfigMap()
+  .build()
+  }
+
+  val podWithVolume = new PodBuilder(pod.pod)
+.editSpec()
+  .addNewVolumeLike(configMapVolume)
+.endVolume()
+  .endSpec()
+.build()
+
+  val containerWithMount = new ContainerBuilder(pod.container)
+.addNewVolumeMount()
+  .withName(KRB_FILE_VOLUME)
+  .withMountPath(KRB_FILE_DIR_PATH + "/krb5.conf")
+  .withSubPath("krb5.conf")
+  .endVolumeMount()
+.build()
+
+  SparkPod(podWithVolume, containerWithMount)
+}.transform {
+  case pod if needKeytabUpload =>
+// If keytab is defined and is a submission-local file (not local: 
URI), then create a
+// secret for it. The keytab data will be stored in this secret 
below.
+val podWitKeytab = new PodBuilder(pod.pod)
+  .editOrNewSpec()
+

[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...

2018-11-14 Thread ifilonenko
Github user ifilonenko commented on a diff in the pull request:

https://github.com/apache/spark/pull/22911#discussion_r233648073
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
 ---
@@ -69,97 +72,191 @@ private[spark] class KerberosConfDriverFeatureStep(
 "If a Kerberos keytab is specified you must also specify a Kerberos 
principal")
 
   KubernetesUtils.requireBothOrNeitherDefined(
-existingSecretName,
-existingSecretItemKey,
+existingDtSecret,
+existingDtItemKey,
 "If a secret data item-key where the data of the Kerberos Delegation 
Token is specified" +
   " you must also specify the name of the secret",
 "If a secret storing a Kerberos Delegation Token is specified you must 
also" +
   " specify the item-key where the data is stored")
 
-  private val hadoopConfigurationFiles = 
hadoopConfDirSpec.hadoopConfDir.map { hConfDir =>
-HadoopBootstrapUtil.getHadoopConfFiles(hConfDir)
+  if (!hasKerberosConf) {
+logInfo("You have not specified a krb5.conf file locally or via a 
ConfigMap. " +
+  "Make sure that you have the krb5.conf locally on the driver image.")
   }
-  private val newHadoopConfigMapName =
-if (hadoopConfDirSpec.hadoopConfigMapName.isEmpty) {
-  Some(kubernetesConf.hadoopConfigMapName)
-} else {
-  None
-}
 
-  // Either use pre-existing secret or login to create new Secret with DT 
stored within
-  private val kerberosConfSpec: Option[KerberosConfigSpec] = (for {
-secretName <- existingSecretName
-secretItemKey <- existingSecretItemKey
-  } yield {
-KerberosConfigSpec(
-  dtSecret = None,
-  dtSecretName = secretName,
-  dtSecretItemKey = secretItemKey,
-  jobUserName = kubeTokenManager.getCurrentUser.getShortUserName)
-  }).orElse(
-if (isKerberosEnabled) {
-  Some(HadoopKerberosLogin.buildSpec(
-conf,
-kubernetesConf.appResourceNamePrefix,
-kubeTokenManager))
-} else {
-  None
+  // Create delegation tokens if needed. This is a lazy val so that it's 
not populated
+  // unnecessarily. But it needs to be accessible to different methods in 
this class,
+  // since it's not clear based solely on available configuration options 
that delegation
+  // tokens are needed when other credentials are not available.
+  private lazy val delegationTokens: Array[Byte] = if (keytab.isEmpty && 
existingDtSecret.isEmpty) {
+createDelegationTokens()
+  } else {
+null
+  }
+
+  private def needKeytabUpload: Boolean = 
keytab.exists(!Utils.isLocalUri(_))
+
+  private def dtSecretName: String = 
s"${kubernetesConf.appResourceNamePrefix}-delegation-tokens"
+
+  private def ktSecretName: String = 
s"${kubernetesConf.appResourceNamePrefix}-kerberos-keytab"
+
+  private def hasKerberosConf: Boolean = krb5CMap.isDefined | 
krb5File.isDefined
+
+  override def configurePod(original: SparkPod): SparkPod = {
+original.transform { case pod if hasKerberosConf =>
+  val configMapVolume = if (krb5CMap.isDefined) {
+new VolumeBuilder()
+  .withName(KRB_FILE_VOLUME)
+  .withNewConfigMap()
+.withName(krb5CMap.get)
+.endConfigMap()
+  .build()
+  } else {
+val krb5Conf = new File(krb5File.get)
+new VolumeBuilder()
+  .withName(KRB_FILE_VOLUME)
+  .withNewConfigMap()
+  .withName(kubernetesConf.krbConfigMapName)
+  .withItems(new KeyToPathBuilder()
+.withKey(krb5Conf.getName())
+.withPath(krb5Conf.getName())
+.build())
+  .endConfigMap()
+  .build()
+  }
+
+  val podWithVolume = new PodBuilder(pod.pod)
+.editSpec()
+  .addNewVolumeLike(configMapVolume)
+.endVolume()
+  .endSpec()
+.build()
+
+  val containerWithMount = new ContainerBuilder(pod.container)
+.addNewVolumeMount()
+  .withName(KRB_FILE_VOLUME)
+  .withMountPath(KRB_FILE_DIR_PATH + "/krb5.conf")
+  .withSubPath("krb5.conf")
+  .endVolumeMount()
+.build()
+
+  SparkPod(podWithVolume, containerWithMount)
+}.transform {
+  case pod if needKeytabUpload =>
+// If keytab is defined and is a submission-local file (not local: 
URI), then create a
+// secret for it. The keytab data will be stored in this secret 
below.
+val podWitKeytab = new PodBuilder(pod.pod)
+  .editOrNewSpec()
+

[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...

2018-11-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22911#discussion_r233643587
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
 ---
@@ -144,6 +144,10 @@ private[spark] class BasicExecutorFeatureStep(
 .addToLimits("memory", executorMemoryQuantity)
 .addToRequests("cpu", executorCpuQuantity)
 .endResources()
+.addNewEnv()
+  .withName(ENV_SPARK_USER)
--- End diff --

If you don't do this, whatever is the OS user in the container will become 
the identity used to talk to Hadoop services (when kerberos is not on).

In YARN, for example, that would be the "yarn" user.

In k8s, with the current image, that would be "root".

You probably don't want that by default. We're talking about non-secured 
Hadoop here, so users can easily override this stuff, but by default let's at 
least try to identify the user correctly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...

2018-11-14 Thread ifilonenko
Github user ifilonenko commented on a diff in the pull request:

https://github.com/apache/spark/pull/22911#discussion_r233642625
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
 ---
@@ -144,6 +144,10 @@ private[spark] class BasicExecutorFeatureStep(
 .addToLimits("memory", executorMemoryQuantity)
 .addToRequests("cpu", executorCpuQuantity)
 .endResources()
+.addNewEnv()
+  .withName(ENV_SPARK_USER)
--- End diff --

I see that you noted that this is always done across resource managers. 
What is the reason for that, just wondering? as I introduced it exclusively in 
the HadoopSteps


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...

2018-11-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22911#discussion_r233591350
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DelegationTokenFeatureStep.scala
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, 
PodBuilder, SecretBuilder}
+import org.apache.commons.codec.binary.Base64
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils, 
SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.config._
+
+/**
+ * Delegation token support for Spark apps on kubernetes.
+ *
+ * When preparing driver resources, this step will generate delegation 
tokens for the app if
+ * they're needed.
+ *
+ * When preparing pods, this step will mount the delegation token secret 
(either pre-defined,
+ * or generated by this step when preparing the driver).
+ */
+private[spark] class DelegationTokenFeatureStep(conf: KubernetesConf[_], 
isDriver: Boolean)
--- End diff --

I think you're missing one use case there. There are 3 use cases:

- keytab: keytab is provided to driver, driver handles kerberos login, 
creates tokens, distributes tokens to executors
- pre-defined token secret: secret is mounted in the driver, env variable 
is set, driver loads them and distributes to executors
- default kerberos case: submission client generates delegation tokens, 
creates a secret for them, then this behaves like the bullet above

The third use case is actually the most common. In your reply above you're 
only covering the other two. My code covers all three.

I'm just saying that this code actually doesn't need to do anything on the 
executor side, because the driver takes care of everything when the credentials 
are provided.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...

2018-11-14 Thread ifilonenko
Github user ifilonenko commented on a diff in the pull request:

https://github.com/apache/spark/pull/22911#discussion_r233588339
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DelegationTokenFeatureStep.scala
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, 
PodBuilder, SecretBuilder}
+import org.apache.commons.codec.binary.Base64
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils, 
SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.config._
+
+/**
+ * Delegation token support for Spark apps on kubernetes.
+ *
+ * When preparing driver resources, this step will generate delegation 
tokens for the app if
+ * they're needed.
+ *
+ * When preparing pods, this step will mount the delegation token secret 
(either pre-defined,
+ * or generated by this step when preparing the driver).
+ */
+private[spark] class DelegationTokenFeatureStep(conf: KubernetesConf[_], 
isDriver: Boolean)
--- End diff --

This is what I meant above when I said that the `HadoopKerberosLogin` logic 
could be deleted. The assumption here is that the secret should not be created 
as the keytab will use the HadoopDelegationTokenManager logic. The only secret 
that should be _created_ would be the keytab. However, I personally thought 
that we should point to a secretName that is either the delegationToken or the 
keytab. Hence why I suggested that the secretName and secretItemKey remain. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...

2018-11-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22911#discussion_r233578429
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DelegationTokenFeatureStep.scala
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, 
PodBuilder, SecretBuilder}
+import org.apache.commons.codec.binary.Base64
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils, 
SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.config._
+
+/**
+ * Delegation token support for Spark apps on kubernetes.
+ *
+ * When preparing driver resources, this step will generate delegation 
tokens for the app if
+ * they're needed.
+ *
+ * When preparing pods, this step will mount the delegation token secret 
(either pre-defined,
+ * or generated by this step when preparing the driver).
+ */
+private[spark] class DelegationTokenFeatureStep(conf: KubernetesConf[_], 
isDriver: Boolean)
--- End diff --

During the recent discussion in the sig meeting I was thinking about this 
guy. I think it may not be necessary to mount this secret in executors, and let 
the driver propagate the tokens to executors through its normal means.

e.g. later if/when adding code to monitor delegation tokens for updates, 
that would mean only the driver has to do it.

I'll git this a try; if it works, it might be feasible to re-merge this 
code with the kerberos step.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...

2018-11-14 Thread ifilonenko
Github user ifilonenko commented on a diff in the pull request:

https://github.com/apache/spark/pull/22911#discussion_r233542062
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala
 ---
@@ -18,7 +18,30 @@ package org.apache.spark.deploy.k8s
 
 import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, 
PodBuilder}
 
-private[spark] case class SparkPod(pod: Pod, container: Container)
+private[spark] case class SparkPod(pod: Pod, container: Container) {
+
+  /**
+   * Convenience method to apply a series of chained transformations to a 
pod.
+   *
+   * Use it like:
+   *
+   * original.modify { case pod =>
+   *   // update pod and return new one
+   * }.modify { case pod =>
+   *   // more changes that create a new pod
+   * }.modify {
+   *   case pod if someCondition => // new pod
+   * }
+   *
+   * This makes it cleaner to apply multiple transformations, avoiding 
having to create
+   * a bunch of awkwardly-named local variables. Since the argument is a 
partial function,
+   * it can do matching without needing to exhaust all the possibilities. 
If the function
+   * is not applied, then the original pod will be kept.
+   */
+  def transform(fn: PartialFunction[SparkPod, SparkPod]): SparkPod = 
fn.lift(this).getOrElse(this)
--- End diff --

I would think that this change is out of the scope of this PR, but I do 
love the use of a PartialFunction here. Thanks for this! 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...

2018-11-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22911#discussion_r233541147
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala
 ---
@@ -18,7 +18,30 @@ package org.apache.spark.deploy.k8s
 
 import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, 
PodBuilder}
 
-private[spark] case class SparkPod(pod: Pod, container: Container)
+private[spark] case class SparkPod(pod: Pod, container: Container) {
+
+  /**
+   * Convenience method to apply a series of chained transformations to a 
pod.
+   *
+   * Use it like:
+   *
+   * original.modify { case pod =>
+   *   // update pod and return new one
+   * }.modify { case pod =>
+   *   // more changes that create a new pod
+   * }.modify {
+   *   case pod if someCondition => // new pod
+   * }
+   *
+   * This makes it cleaner to apply multiple transformations, avoiding 
having to create
+   * a bunch of awkwardly-named local variables. Since the argument is a 
partial function,
+   * it can do matching without needing to exhaust all the possibilities. 
If the function
+   * is not applied, then the original pod will be kept.
+   */
+  def transform(fn: PartialFunction[SparkPod, SparkPod]): SparkPod = 
fn.lift(this).getOrElse(this)
--- End diff --

I added this because I started to get tired of code like the following:

```
val someIntermediateName = someOption.map { blah =>
   // create the updated pod
}.getOrElse(previousPodName)

// lather, rinse, repeat
```

To me that's hard to follow and brittle, and this pattern makes things 
clearer IMO.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...

2018-11-06 Thread ifilonenko
Github user ifilonenko commented on a diff in the pull request:

https://github.com/apache/spark/pull/22911#discussion_r231359962
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -123,7 +126,11 @@ private[spark] class KubernetesClusterSchedulerBackend(
   }
 
   override def createDriverEndpoint(properties: Seq[(String, String)]): 
DriverEndpoint = {
-new KubernetesDriverEndpoint(rpcEnv, properties)
+new KubernetesDriverEndpoint(sc.env.rpcEnv, properties)
+  }
+
+  override protected def createTokenManager(): 
Option[HadoopDelegationTokenManager] = {
+Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration))
--- End diff --

Yeah, I can always throw up a follow-up for that. No worries


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...

2018-11-06 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22911#discussion_r231348306
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -123,7 +126,11 @@ private[spark] class KubernetesClusterSchedulerBackend(
   }
 
   override def createDriverEndpoint(properties: Seq[(String, String)]): 
DriverEndpoint = {
-new KubernetesDriverEndpoint(rpcEnv, properties)
+new KubernetesDriverEndpoint(sc.env.rpcEnv, properties)
+  }
+
+  override protected def createTokenManager(): 
Option[HadoopDelegationTokenManager] = {
+Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration))
--- End diff --

Ah, ok I get it now. I can do that. I'll try to include support for (3) but 
it depends on how much I have to touch other parts of the code. Hopefully not 
much.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...

2018-11-06 Thread ifilonenko
Github user ifilonenko commented on a diff in the pull request:

https://github.com/apache/spark/pull/22911#discussion_r231344398
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -123,7 +126,11 @@ private[spark] class KubernetesClusterSchedulerBackend(
   }
 
   override def createDriverEndpoint(properties: Seq[(String, String)]): 
DriverEndpoint = {
-new KubernetesDriverEndpoint(rpcEnv, properties)
+new KubernetesDriverEndpoint(sc.env.rpcEnv, properties)
+  }
+
+  override protected def createTokenManager(): 
Option[HadoopDelegationTokenManager] = {
+Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration))
--- End diff --

Oh, I was referencing the creation of `Delegation Token` secret when a 
`--keytab` is specified. I believe that you are right in that in client-mode 
you would not need to worry about running this step. But I think the 3rd option 
would be good to include here. I think that with the introduction of 
`HadoopDelegationTokenManager` we should remove the creation of the `dtSecret`, 
and that should be included in this PR if you are introducing this. Therefore, 
I think it is sensible to refactor the `KerberosConfigSpec` to have a generic 
`secret`, `secretName`, `secretKey`, that would either contain a 
`DelegationToken` or a `keytab`.  Such that the code block: 
```
  private val kerberosConfSpec: Option[KerberosConfigSpec] = (for {
secretName <- existingSecretName
secretItemKey <- existingSecretItemKey
  } yield {
KerberosConfigSpec(
  secret = None,
  secretName = secretName,
  secretItemKey = secretItemKey,
  jobUserName = kubeTokenManager.getCurrentUser.getShortUserName)
  }).orElse(
if (isKerberosEnabled) {
  keytab.map { . }
} else {
  None
}
```
would return a kerberosConfSpec that would account for either case. Erm, 
that would also mean that you could delete the `HadoopKerberosLogin` method. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...

2018-11-06 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22911#discussion_r231256397
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -123,7 +126,11 @@ private[spark] class KubernetesClusterSchedulerBackend(
   }
 
   override def createDriverEndpoint(properties: Seq[(String, String)]): 
DriverEndpoint = {
-new KubernetesDriverEndpoint(rpcEnv, properties)
+new KubernetesDriverEndpoint(sc.env.rpcEnv, properties)
+  }
+
+  override protected def createTokenManager(): 
Option[HadoopDelegationTokenManager] = {
+Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration))
--- End diff --

I'm not sure I follow your train of thought here so I'll comment on what I 
understand.

First, the code that creates the secret is in 
`KerberosConfDriverFeatureStep`. As far as I know, that class is not used in 
client mode. In client mode the keytab stays in the client machine, with the 
driver, and the driver just sends DTs to executors. So the whole discussion 
about secrets is irrelevant in that case.

In cluster mode, you need the driver to have access to the keytab for this 
feature to work. There are a few ways to achieve that:

- the current YARN mode, which is the keytab lives in the submission host, 
and is distributed with the application. In k8s this would amount to what I 
have here: the submission code creates a secret for the driver pod and stashes 
the keytab in it.

- add the ability to store the keytab in an external place (like HDFS or an 
HTTP server). That has drawbacks (e.g. people probably wouldn't like that, and 
there's a chicken & egg problem in HDFS, so you'd still need a kerberos TGT to 
bootstrap things).

- add a k8s-specific feature of mounting a pre-defined secret in the driver 
pod. I believe this is what you're suggesting?

I think supporting the first is easy as this change shows, and keeps 
feature parity with what's already supported in YARN. Unless there's a glaring 
issue with using secrets that I'm not aware of, I don't see a reason for not 
doing it.

The third option (pre-defined secret) could also be added. My hope is that 
you could do it with pre-existing configs (`spark.kubernetes.driver.secrets.` & 
company), but I don't know how you'd set the `spark.kerberos.keytab` and 
`spark.kerberos.principal` configs just in the driver - and not in the 
submission client. So it seems we'd need at least a little bit of code here to 
support that scenario.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...

2018-11-06 Thread rvesse
Github user rvesse commented on a diff in the pull request:

https://github.com/apache/spark/pull/22911#discussion_r231195413
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
 ---
@@ -126,20 +134,53 @@ private[spark] class KerberosConfDriverFeatureStep(
   HadoopBootstrapUtil.bootstrapSparkUserPod(
 kubeTokenManager.getCurrentUser.getShortUserName,
 hadoopBasedSparkPod))
+
+if (keytab.isDefined) {
+  val podWitKeytab = new PodBuilder(kerberizedPod.pod)
--- End diff --

Typo - `Wit` -> `With`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...

2018-11-05 Thread ifilonenko
Github user ifilonenko commented on a diff in the pull request:

https://github.com/apache/spark/pull/22911#discussion_r230860519
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -123,7 +126,11 @@ private[spark] class KubernetesClusterSchedulerBackend(
   }
 
   override def createDriverEndpoint(properties: Seq[(String, String)]): 
DriverEndpoint = {
-new KubernetesDriverEndpoint(rpcEnv, properties)
+new KubernetesDriverEndpoint(sc.env.rpcEnv, properties)
+  }
+
+  override protected def createTokenManager(): 
Option[HadoopDelegationTokenManager] = {
+Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration))
--- End diff --

If we are introducing this change, I think it is important that we talk 
about the future of secret creation upon using `--keytab` + `principle`.  Right 
now, secrets are created when a keytab is used by the client or for client-mode 
by the driver; this was used primarily for testing (on my end) but also because 
this logic wasn't previously generalized for all cluster-managers. Should we 
create an option for the user to create a secret or get rid of it as a whole, 
as delegation token logic is handled via the UpdateDelegationToken message 
passing framework. In essence, if we leave the ability to create a secret we 
are twice obtaining a DT which is extraneous. And if we are removing it, it is 
sensible to refactor the KerberosConfig logic to account for this removal. I 
was planning to do this in my token renewal PR where I was also introducing 
this change, but it seems that this will probably get merged in before mine, as 
such, here would be a better place to refactor. Or maybe a sepe
 rate PR that introduces this line and does the refactor, and then this and my 
PR could be introduced subsequently. 

thoughts, @vanzin ? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...

2018-10-31 Thread vanzin
GitHub user vanzin opened a pull request:

https://github.com/apache/spark/pull/22911

[SPARK-25815][k8s] Support kerberos in client mode, keytab-based token 
renewal.

This change hooks up the k8s backed to the updated 
HadoopDelegationTokenManager,
so that delegation tokens are also available in client mode.

It also closes the remaining gap in kerberos support which is to support
the current keytab-based token renewal supported by YARN (and partially by
Mesos). That is done by stashing the keytab in a secret on the driver pod,
and then providing it to the HadoopDelegationTokenManager when the
driver starts.

Tested in cluster mode with and without a keytab, and also in client
mode.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vanzin/spark SPARK-25815

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22911.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22911


commit 7f86e231922648a9da07ae73026a3a430d2c83f5
Author: Marcelo Vanzin 
Date:   2018-10-19T20:33:47Z

[SPARK-25815][k8s] Support kerbers in client mode, keytab-based token 
renewal.

This change hooks up the k8s backed to the updated 
HadoopDelegationTokenManager,
so that delegation tokens are also available in client mode.

It also closes the remaining gap in kerberos support which is to support
the current keytab-based token renewal supported by YARN (and partially by
Mesos). That is done by stashing the keytab in a secret on the driver pod,
and then providing it to the HadoopDelegationTokenManager when the
driver starts.

Tested in cluster mode with and without a keytab, and also in client
mode.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org