[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/22911#discussion_r238770191 --- Diff: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStepSuite.scala --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import java.io.File +import java.nio.charset.StandardCharsets.UTF_8 +import java.security.PrivilegedExceptionAction + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files +import io.fabric8.kubernetes.api.model.{ConfigMap, Secret} +import org.apache.commons.codec.binary.Base64 +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.{Credentials, UserGroupInformation} + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.k8s._ +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource +import org.apache.spark.internal.config._ +import org.apache.spark.util.Utils + +class KerberosConfDriverFeatureStepSuite extends SparkFunSuite { + + import KubernetesFeaturesTestUtils._ + import SecretVolumeUtils._ + + private val tmpDir = Utils.createTempDir() + + test("mount krb5 config map if defined") { +val configMap = "testConfigMap" +val step = createStep( + new SparkConf(false).set(KUBERNETES_KERBEROS_KRB5_CONFIG_MAP, configMap)) + +checkPodForKrbConf(step.configurePod(SparkPod.initialPod()), configMap) +assert(step.getAdditionalPodSystemProperties().isEmpty) + assert(filter[ConfigMap](step.getAdditionalKubernetesResources()).isEmpty) + } + + test("create krb5.conf config map if local config provided") { +val krbConf = File.createTempFile("krb5", ".conf", tmpDir) +Files.write("some data", krbConf, UTF_8) + +val sparkConf = new SparkConf(false) + .set(KUBERNETES_KERBEROS_KRB5_FILE, krbConf.getAbsolutePath()) +val step = createStep(sparkConf) + +val confMap = filter[ConfigMap](step.getAdditionalKubernetesResources()).head +assert(confMap.getData().keySet().asScala === Set(krbConf.getName())) + +checkPodForKrbConf(step.configurePod(SparkPod.initialPod()), confMap.getMetadata().getName()) +assert(step.getAdditionalPodSystemProperties().isEmpty) + } + + test("create keytab secret if client keytab file used") { +val keytab = File.createTempFile("keytab", ".bin", tmpDir) +Files.write("some data", keytab, UTF_8) + +val sparkConf = new SparkConf(false) + .set(KEYTAB, keytab.getAbsolutePath()) + .set(PRINCIPAL, "alice") +val step = createStep(sparkConf) + +val pod = step.configurePod(SparkPod.initialPod()) +assert(podHasVolume(pod.pod, KERBEROS_KEYTAB_VOLUME)) +assert(containerHasVolume(pod.container, KERBEROS_KEYTAB_VOLUME, KERBEROS_KEYTAB_MOUNT_POINT)) + +assert(step.getAdditionalPodSystemProperties().keys === Set(KEYTAB.key)) + +val secret = filter[Secret](step.getAdditionalKubernetesResources()).head +assert(secret.getData().keySet().asScala === Set(keytab.getName())) + } + + test("do nothing if container-local keytab used") { +val sparkConf = new SparkConf(false) + .set(KEYTAB, "local:/my.keytab") + .set(PRINCIPAL, "alice") +val step = createStep(sparkConf) + +val initial = SparkPod.initialPod() +assert(step.configurePod(initial) === initial) +assert(step.getAdditionalPodSystemProperties().isEmpty) +assert(step.getAdditionalKubernetesResources()
[GitHub] spark issue #23053: [SPARK-25957][K8S] Make building alternate language bind...
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/23053 LGTM. I will let @mccheah or @squito sign off on it with any additional comments / reviews --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23053: [SPARK-25957][K8S] Make building alternate language bind...
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/23053 You need to make the appropriate change to `setup-integration-test-env.sh` to ensure that the PySpark tests pass in the integration-test suite --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23053: [SPARK-25957][K8S] Add ability to skip building optional...
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/23053 I agree with @mccheah, `-p skip` seems odd. It seems cleaner to only include i.e. `-p` flag when we desire python support. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23017: [SPARK-26015][K8S] Set a default UID for Spark on K8S Im...
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/23017 https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-make-spark-distribution-unified/5086/ seems to be hanging on the distribution build (45min+) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23017: [SPARK-26015][K8S] Set a default UID for Spark on K8S Im...
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/23017 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23037: [SPARK-26083][k8s] Add Copy pyspark into corresponding d...
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/23037 > This is fine, but please file a bug. Okay, as such, @AzureQ could you add an integration test to `ClientModeTestsSuite` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23026: [SPARK-25960][k8s] Support subpath mounting with Kuberne...
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/23026 > if such a list exists it should be the same list that triggers regular tests. I differ that to @shaneknapp --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23026: [SPARK-25960][k8s] Support subpath mounting with Kuberne...
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/23026 @vanzin, I added you to the admin list for triggering the `testing-k8s-prb-make-spark-distribution-unified` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23026: [SPARK-25960][k8s] Support subpath mounting with Kuberne...
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/23026 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23037: [MINOR][k8s] Add Copy pyspark into corresponding dir cmd...
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/23037 I see this customization to be specific towards how you build your custom Docker image. Unless it is relevant towards testing, we are trying to keep the default Docker image as lightweight as possible (as long as it passes our test cases). Unless one of the commiters sees this as an important thing to include in the default image, I believe it to be a customization. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/22911#discussion_r233660132 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala --- @@ -69,97 +72,191 @@ private[spark] class KerberosConfDriverFeatureStep( "If a Kerberos keytab is specified you must also specify a Kerberos principal") KubernetesUtils.requireBothOrNeitherDefined( -existingSecretName, -existingSecretItemKey, +existingDtSecret, +existingDtItemKey, "If a secret data item-key where the data of the Kerberos Delegation Token is specified" + " you must also specify the name of the secret", "If a secret storing a Kerberos Delegation Token is specified you must also" + " specify the item-key where the data is stored") - private val hadoopConfigurationFiles = hadoopConfDirSpec.hadoopConfDir.map { hConfDir => -HadoopBootstrapUtil.getHadoopConfFiles(hConfDir) + if (!hasKerberosConf) { +logInfo("You have not specified a krb5.conf file locally or via a ConfigMap. " + + "Make sure that you have the krb5.conf locally on the driver image.") } - private val newHadoopConfigMapName = -if (hadoopConfDirSpec.hadoopConfigMapName.isEmpty) { - Some(kubernetesConf.hadoopConfigMapName) -} else { - None -} - // Either use pre-existing secret or login to create new Secret with DT stored within - private val kerberosConfSpec: Option[KerberosConfigSpec] = (for { -secretName <- existingSecretName -secretItemKey <- existingSecretItemKey - } yield { -KerberosConfigSpec( - dtSecret = None, - dtSecretName = secretName, - dtSecretItemKey = secretItemKey, - jobUserName = kubeTokenManager.getCurrentUser.getShortUserName) - }).orElse( -if (isKerberosEnabled) { - Some(HadoopKerberosLogin.buildSpec( -conf, -kubernetesConf.appResourceNamePrefix, -kubeTokenManager)) -} else { - None + // Create delegation tokens if needed. This is a lazy val so that it's not populated + // unnecessarily. But it needs to be accessible to different methods in this class, + // since it's not clear based solely on available configuration options that delegation + // tokens are needed when other credentials are not available. + private lazy val delegationTokens: Array[Byte] = if (keytab.isEmpty && existingDtSecret.isEmpty) { +createDelegationTokens() + } else { +null + } + + private def needKeytabUpload: Boolean = keytab.exists(!Utils.isLocalUri(_)) + + private def dtSecretName: String = s"${kubernetesConf.appResourceNamePrefix}-delegation-tokens" + + private def ktSecretName: String = s"${kubernetesConf.appResourceNamePrefix}-kerberos-keytab" + + private def hasKerberosConf: Boolean = krb5CMap.isDefined | krb5File.isDefined + + override def configurePod(original: SparkPod): SparkPod = { +original.transform { case pod if hasKerberosConf => + val configMapVolume = if (krb5CMap.isDefined) { +new VolumeBuilder() + .withName(KRB_FILE_VOLUME) + .withNewConfigMap() +.withName(krb5CMap.get) +.endConfigMap() + .build() + } else { +val krb5Conf = new File(krb5File.get) +new VolumeBuilder() + .withName(KRB_FILE_VOLUME) + .withNewConfigMap() + .withName(kubernetesConf.krbConfigMapName) + .withItems(new KeyToPathBuilder() +.withKey(krb5Conf.getName()) +.withPath(krb5Conf.getName()) +.build()) + .endConfigMap() + .build() + } + + val podWithVolume = new PodBuilder(pod.pod) +.editSpec() + .addNewVolumeLike(configMapVolume) +.endVolume() + .endSpec() +.build() + + val containerWithMount = new ContainerBuilder(pod.container) +.addNewVolumeMount() + .withName(KRB_FILE_VOLUME) + .withMountPath(KRB_FILE_DIR_PATH + "/krb5.conf") + .withSubPath("krb5.conf") + .endVolumeMount() +.build() + + SparkPod(podWithVolume, containerWithMount) +}.transform { + case pod if needKeytabUpload => +// If keytab is defined and is a submission-local file (not local: URI), then create a +// secret for
[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/22911#discussion_r233657016 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala --- @@ -69,97 +72,191 @@ private[spark] class KerberosConfDriverFeatureStep( "If a Kerberos keytab is specified you must also specify a Kerberos principal") KubernetesUtils.requireBothOrNeitherDefined( -existingSecretName, -existingSecretItemKey, +existingDtSecret, +existingDtItemKey, "If a secret data item-key where the data of the Kerberos Delegation Token is specified" + " you must also specify the name of the secret", "If a secret storing a Kerberos Delegation Token is specified you must also" + " specify the item-key where the data is stored") - private val hadoopConfigurationFiles = hadoopConfDirSpec.hadoopConfDir.map { hConfDir => -HadoopBootstrapUtil.getHadoopConfFiles(hConfDir) + if (!hasKerberosConf) { +logInfo("You have not specified a krb5.conf file locally or via a ConfigMap. " + + "Make sure that you have the krb5.conf locally on the driver image.") } - private val newHadoopConfigMapName = -if (hadoopConfDirSpec.hadoopConfigMapName.isEmpty) { - Some(kubernetesConf.hadoopConfigMapName) -} else { - None -} - // Either use pre-existing secret or login to create new Secret with DT stored within - private val kerberosConfSpec: Option[KerberosConfigSpec] = (for { -secretName <- existingSecretName -secretItemKey <- existingSecretItemKey - } yield { -KerberosConfigSpec( - dtSecret = None, - dtSecretName = secretName, - dtSecretItemKey = secretItemKey, - jobUserName = kubeTokenManager.getCurrentUser.getShortUserName) - }).orElse( -if (isKerberosEnabled) { - Some(HadoopKerberosLogin.buildSpec( -conf, -kubernetesConf.appResourceNamePrefix, -kubeTokenManager)) -} else { - None + // Create delegation tokens if needed. This is a lazy val so that it's not populated + // unnecessarily. But it needs to be accessible to different methods in this class, + // since it's not clear based solely on available configuration options that delegation + // tokens are needed when other credentials are not available. + private lazy val delegationTokens: Array[Byte] = if (keytab.isEmpty && existingDtSecret.isEmpty) { +createDelegationTokens() + } else { +null + } + + private def needKeytabUpload: Boolean = keytab.exists(!Utils.isLocalUri(_)) + + private def dtSecretName: String = s"${kubernetesConf.appResourceNamePrefix}-delegation-tokens" + + private def ktSecretName: String = s"${kubernetesConf.appResourceNamePrefix}-kerberos-keytab" + + private def hasKerberosConf: Boolean = krb5CMap.isDefined | krb5File.isDefined + + override def configurePod(original: SparkPod): SparkPod = { +original.transform { case pod if hasKerberosConf => + val configMapVolume = if (krb5CMap.isDefined) { +new VolumeBuilder() + .withName(KRB_FILE_VOLUME) + .withNewConfigMap() +.withName(krb5CMap.get) +.endConfigMap() + .build() + } else { +val krb5Conf = new File(krb5File.get) +new VolumeBuilder() + .withName(KRB_FILE_VOLUME) + .withNewConfigMap() + .withName(kubernetesConf.krbConfigMapName) + .withItems(new KeyToPathBuilder() +.withKey(krb5Conf.getName()) +.withPath(krb5Conf.getName()) +.build()) + .endConfigMap() + .build() + } + + val podWithVolume = new PodBuilder(pod.pod) +.editSpec() + .addNewVolumeLike(configMapVolume) +.endVolume() + .endSpec() +.build() + + val containerWithMount = new ContainerBuilder(pod.container) +.addNewVolumeMount() + .withName(KRB_FILE_VOLUME) + .withMountPath(KRB_FILE_DIR_PATH + "/krb5.conf") + .withSubPath("krb5.conf") + .endVolumeMount() +.build() + + SparkPod(podWithVolume, containerWithMount) +}.transform { + case pod if needKeytabUpload => +// If keytab is defined and is a submission-local file (not local: URI), then create a +// secret for
[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/22911#discussion_r233652949 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala --- @@ -69,97 +72,191 @@ private[spark] class KerberosConfDriverFeatureStep( "If a Kerberos keytab is specified you must also specify a Kerberos principal") KubernetesUtils.requireBothOrNeitherDefined( -existingSecretName, -existingSecretItemKey, +existingDtSecret, +existingDtItemKey, "If a secret data item-key where the data of the Kerberos Delegation Token is specified" + " you must also specify the name of the secret", "If a secret storing a Kerberos Delegation Token is specified you must also" + " specify the item-key where the data is stored") - private val hadoopConfigurationFiles = hadoopConfDirSpec.hadoopConfDir.map { hConfDir => -HadoopBootstrapUtil.getHadoopConfFiles(hConfDir) + if (!hasKerberosConf) { +logInfo("You have not specified a krb5.conf file locally or via a ConfigMap. " + + "Make sure that you have the krb5.conf locally on the driver image.") } - private val newHadoopConfigMapName = -if (hadoopConfDirSpec.hadoopConfigMapName.isEmpty) { - Some(kubernetesConf.hadoopConfigMapName) -} else { - None -} - // Either use pre-existing secret or login to create new Secret with DT stored within - private val kerberosConfSpec: Option[KerberosConfigSpec] = (for { -secretName <- existingSecretName -secretItemKey <- existingSecretItemKey - } yield { -KerberosConfigSpec( - dtSecret = None, - dtSecretName = secretName, - dtSecretItemKey = secretItemKey, - jobUserName = kubeTokenManager.getCurrentUser.getShortUserName) - }).orElse( -if (isKerberosEnabled) { - Some(HadoopKerberosLogin.buildSpec( -conf, -kubernetesConf.appResourceNamePrefix, -kubeTokenManager)) -} else { - None + // Create delegation tokens if needed. This is a lazy val so that it's not populated + // unnecessarily. But it needs to be accessible to different methods in this class, + // since it's not clear based solely on available configuration options that delegation + // tokens are needed when other credentials are not available. + private lazy val delegationTokens: Array[Byte] = if (keytab.isEmpty && existingDtSecret.isEmpty) { +createDelegationTokens() + } else { +null + } + + private def needKeytabUpload: Boolean = keytab.exists(!Utils.isLocalUri(_)) + + private def dtSecretName: String = s"${kubernetesConf.appResourceNamePrefix}-delegation-tokens" + + private def ktSecretName: String = s"${kubernetesConf.appResourceNamePrefix}-kerberos-keytab" + + private def hasKerberosConf: Boolean = krb5CMap.isDefined | krb5File.isDefined + + override def configurePod(original: SparkPod): SparkPod = { +original.transform { case pod if hasKerberosConf => + val configMapVolume = if (krb5CMap.isDefined) { +new VolumeBuilder() + .withName(KRB_FILE_VOLUME) + .withNewConfigMap() +.withName(krb5CMap.get) +.endConfigMap() + .build() + } else { +val krb5Conf = new File(krb5File.get) +new VolumeBuilder() + .withName(KRB_FILE_VOLUME) + .withNewConfigMap() + .withName(kubernetesConf.krbConfigMapName) + .withItems(new KeyToPathBuilder() +.withKey(krb5Conf.getName()) +.withPath(krb5Conf.getName()) +.build()) + .endConfigMap() + .build() + } + + val podWithVolume = new PodBuilder(pod.pod) +.editSpec() + .addNewVolumeLike(configMapVolume) +.endVolume() + .endSpec() +.build() + + val containerWithMount = new ContainerBuilder(pod.container) +.addNewVolumeMount() + .withName(KRB_FILE_VOLUME) + .withMountPath(KRB_FILE_DIR_PATH + "/krb5.conf") + .withSubPath("krb5.conf") + .endVolumeMount() +.build() + + SparkPod(podWithVolume, containerWithMount) +}.transform { + case pod if needKeytabUpload => +// If keytab is defined and is a submission-local file (not local: URI), then create a +// secret for
[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/22911#discussion_r233650115 --- Diff: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStepSuite.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import java.io.File +import java.nio.charset.StandardCharsets.UTF_8 + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files +import io.fabric8.kubernetes.api.model.{ConfigMap, Secret} +import org.mockito.Mockito._ + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s._ +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource +import org.apache.spark.internal.config._ +import org.apache.spark.util.Utils + +class KerberosConfDriverFeatureStepSuite extends SparkFunSuite { + + import KubernetesFeaturesTestUtils._ + import SecretVolumeUtils._ + + private val tmpDir = Utils.createTempDir() + + test("mount krb5 config map if defined") { +val configMap = "testConfigMap" +val step = createStep( + new SparkConf(false).set(KUBERNETES_KERBEROS_KRB5_CONFIG_MAP, configMap)) + +checkPodForKrbConf(step.configurePod(SparkPod.initialPod()), configMap) +assert(step.getAdditionalPodSystemProperties().isEmpty) + assert(filter[ConfigMap](step.getAdditionalKubernetesResources()).isEmpty) + } + + test("create krb5.conf config map if local config provided") { +val krbConf = File.createTempFile("krb5", ".conf", tmpDir) +Files.write("some data", krbConf, UTF_8) + +val sparkConf = new SparkConf(false) + .set(KUBERNETES_KERBEROS_KRB5_FILE, krbConf.getAbsolutePath()) +val step = createStep(sparkConf) + +val confMap = filter[ConfigMap](step.getAdditionalKubernetesResources()).head +assert(confMap.getData().keySet().asScala === Set(krbConf.getName())) + +checkPodForKrbConf(step.configurePod(SparkPod.initialPod()), confMap.getMetadata().getName()) +assert(step.getAdditionalPodSystemProperties().isEmpty) + } + + test("create keytab secret if client keytab file used") { +val keytab = File.createTempFile("keytab", ".bin", tmpDir) +Files.write("some data", keytab, UTF_8) + +val sparkConf = new SparkConf(false) + .set(KEYTAB, keytab.getAbsolutePath()) + .set(PRINCIPAL, "alice") +val step = createStep(sparkConf) + +val pod = step.configurePod(SparkPod.initialPod()) +assert(podHasVolume(pod.pod, KERBEROS_KEYTAB_VOLUME)) +assert(containerHasVolume(pod.container, KERBEROS_KEYTAB_VOLUME, KERBEROS_KEYTAB_MOUNT_POINT)) + +assert(step.getAdditionalPodSystemProperties().keys === Set(KEYTAB.key)) + +val secret = filter[Secret](step.getAdditionalKubernetesResources()).head +assert(secret.getData().keySet().asScala === Set(keytab.getName())) + } + + test("do nothing if container-local keytab used") { +val sparkConf = new SparkConf(false) + .set(KEYTAB, "local:/my.keytab") + .set(PRINCIPAL, "alice") +val step = createStep(sparkConf) + +val initial = SparkPod.initialPod() +assert(step.configurePod(initial) === initial) +assert(step.getAdditionalPodSystemProperties().isEmpty) +assert(step.getAdditionalKubernetesResources().isEmpty) + } + + test("mount delegation tokens if provided") { +val dtSecret = "tokenSecret" +val sparkConf = new SparkConf(false) + .set(KUBERNETES_KERBEROS_DT_SE
[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/22911#discussion_r233649498 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala --- @@ -81,14 +81,14 @@ private[spark] class KubernetesDriverBuilder( val driverCommandStep = provideDriverCommandStep(kubernetesConf) -val maybeHadoopConfigStep = - kubernetesConf.hadoopConfSpec.map { _ => -provideHadoopGlobalStep(kubernetesConf)} +val otherSteps = Seq( --- End diff -- nit: `otherStep` isn't very descriptive to the contents of the steps. I prefer `hadoopSteps` if anything --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/22911#discussion_r233649110 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala --- @@ -69,97 +72,191 @@ private[spark] class KerberosConfDriverFeatureStep( "If a Kerberos keytab is specified you must also specify a Kerberos principal") KubernetesUtils.requireBothOrNeitherDefined( -existingSecretName, -existingSecretItemKey, +existingDtSecret, +existingDtItemKey, "If a secret data item-key where the data of the Kerberos Delegation Token is specified" + " you must also specify the name of the secret", "If a secret storing a Kerberos Delegation Token is specified you must also" + " specify the item-key where the data is stored") - private val hadoopConfigurationFiles = hadoopConfDirSpec.hadoopConfDir.map { hConfDir => -HadoopBootstrapUtil.getHadoopConfFiles(hConfDir) + if (!hasKerberosConf) { +logInfo("You have not specified a krb5.conf file locally or via a ConfigMap. " + + "Make sure that you have the krb5.conf locally on the driver image.") } - private val newHadoopConfigMapName = -if (hadoopConfDirSpec.hadoopConfigMapName.isEmpty) { - Some(kubernetesConf.hadoopConfigMapName) -} else { - None -} - // Either use pre-existing secret or login to create new Secret with DT stored within - private val kerberosConfSpec: Option[KerberosConfigSpec] = (for { -secretName <- existingSecretName -secretItemKey <- existingSecretItemKey - } yield { -KerberosConfigSpec( - dtSecret = None, - dtSecretName = secretName, - dtSecretItemKey = secretItemKey, - jobUserName = kubeTokenManager.getCurrentUser.getShortUserName) - }).orElse( -if (isKerberosEnabled) { - Some(HadoopKerberosLogin.buildSpec( -conf, -kubernetesConf.appResourceNamePrefix, -kubeTokenManager)) -} else { - None + // Create delegation tokens if needed. This is a lazy val so that it's not populated + // unnecessarily. But it needs to be accessible to different methods in this class, + // since it's not clear based solely on available configuration options that delegation + // tokens are needed when other credentials are not available. + private lazy val delegationTokens: Array[Byte] = if (keytab.isEmpty && existingDtSecret.isEmpty) { +createDelegationTokens() + } else { +null + } + + private def needKeytabUpload: Boolean = keytab.exists(!Utils.isLocalUri(_)) + + private def dtSecretName: String = s"${kubernetesConf.appResourceNamePrefix}-delegation-tokens" + + private def ktSecretName: String = s"${kubernetesConf.appResourceNamePrefix}-kerberos-keytab" + + private def hasKerberosConf: Boolean = krb5CMap.isDefined | krb5File.isDefined + + override def configurePod(original: SparkPod): SparkPod = { +original.transform { case pod if hasKerberosConf => + val configMapVolume = if (krb5CMap.isDefined) { +new VolumeBuilder() + .withName(KRB_FILE_VOLUME) + .withNewConfigMap() +.withName(krb5CMap.get) +.endConfigMap() + .build() + } else { +val krb5Conf = new File(krb5File.get) +new VolumeBuilder() + .withName(KRB_FILE_VOLUME) + .withNewConfigMap() + .withName(kubernetesConf.krbConfigMapName) + .withItems(new KeyToPathBuilder() +.withKey(krb5Conf.getName()) +.withPath(krb5Conf.getName()) +.build()) + .endConfigMap() + .build() + } + + val podWithVolume = new PodBuilder(pod.pod) +.editSpec() + .addNewVolumeLike(configMapVolume) +.endVolume() + .endSpec() +.build() + + val containerWithMount = new ContainerBuilder(pod.container) +.addNewVolumeMount() + .withName(KRB_FILE_VOLUME) + .withMountPath(KRB_FILE_DIR_PATH + "/krb5.conf") + .withSubPath("krb5.conf") + .endVolumeMount() +.build() + + SparkPod(podWithVolume, containerWithMount) +}.transform { + case pod if needKeytabUpload => +// If keytab is defined and is a submission-local file (not local: URI), then create a +// secret for
[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/22911#discussion_r233648073 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala --- @@ -69,97 +72,191 @@ private[spark] class KerberosConfDriverFeatureStep( "If a Kerberos keytab is specified you must also specify a Kerberos principal") KubernetesUtils.requireBothOrNeitherDefined( -existingSecretName, -existingSecretItemKey, +existingDtSecret, +existingDtItemKey, "If a secret data item-key where the data of the Kerberos Delegation Token is specified" + " you must also specify the name of the secret", "If a secret storing a Kerberos Delegation Token is specified you must also" + " specify the item-key where the data is stored") - private val hadoopConfigurationFiles = hadoopConfDirSpec.hadoopConfDir.map { hConfDir => -HadoopBootstrapUtil.getHadoopConfFiles(hConfDir) + if (!hasKerberosConf) { +logInfo("You have not specified a krb5.conf file locally or via a ConfigMap. " + + "Make sure that you have the krb5.conf locally on the driver image.") } - private val newHadoopConfigMapName = -if (hadoopConfDirSpec.hadoopConfigMapName.isEmpty) { - Some(kubernetesConf.hadoopConfigMapName) -} else { - None -} - // Either use pre-existing secret or login to create new Secret with DT stored within - private val kerberosConfSpec: Option[KerberosConfigSpec] = (for { -secretName <- existingSecretName -secretItemKey <- existingSecretItemKey - } yield { -KerberosConfigSpec( - dtSecret = None, - dtSecretName = secretName, - dtSecretItemKey = secretItemKey, - jobUserName = kubeTokenManager.getCurrentUser.getShortUserName) - }).orElse( -if (isKerberosEnabled) { - Some(HadoopKerberosLogin.buildSpec( -conf, -kubernetesConf.appResourceNamePrefix, -kubeTokenManager)) -} else { - None + // Create delegation tokens if needed. This is a lazy val so that it's not populated + // unnecessarily. But it needs to be accessible to different methods in this class, + // since it's not clear based solely on available configuration options that delegation + // tokens are needed when other credentials are not available. + private lazy val delegationTokens: Array[Byte] = if (keytab.isEmpty && existingDtSecret.isEmpty) { +createDelegationTokens() + } else { +null + } + + private def needKeytabUpload: Boolean = keytab.exists(!Utils.isLocalUri(_)) + + private def dtSecretName: String = s"${kubernetesConf.appResourceNamePrefix}-delegation-tokens" + + private def ktSecretName: String = s"${kubernetesConf.appResourceNamePrefix}-kerberos-keytab" + + private def hasKerberosConf: Boolean = krb5CMap.isDefined | krb5File.isDefined + + override def configurePod(original: SparkPod): SparkPod = { +original.transform { case pod if hasKerberosConf => + val configMapVolume = if (krb5CMap.isDefined) { +new VolumeBuilder() + .withName(KRB_FILE_VOLUME) + .withNewConfigMap() +.withName(krb5CMap.get) +.endConfigMap() + .build() + } else { +val krb5Conf = new File(krb5File.get) +new VolumeBuilder() + .withName(KRB_FILE_VOLUME) + .withNewConfigMap() + .withName(kubernetesConf.krbConfigMapName) + .withItems(new KeyToPathBuilder() +.withKey(krb5Conf.getName()) +.withPath(krb5Conf.getName()) +.build()) + .endConfigMap() + .build() + } + + val podWithVolume = new PodBuilder(pod.pod) +.editSpec() + .addNewVolumeLike(configMapVolume) +.endVolume() + .endSpec() +.build() + + val containerWithMount = new ContainerBuilder(pod.container) +.addNewVolumeMount() + .withName(KRB_FILE_VOLUME) + .withMountPath(KRB_FILE_DIR_PATH + "/krb5.conf") + .withSubPath("krb5.conf") + .endVolumeMount() +.build() + + SparkPod(podWithVolume, containerWithMount) +}.transform { + case pod if needKeytabUpload => +// If keytab is defined and is a submission-local file (not local: URI), then create a +// secret for
[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/22911#discussion_r233642625 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala --- @@ -144,6 +144,10 @@ private[spark] class BasicExecutorFeatureStep( .addToLimits("memory", executorMemoryQuantity) .addToRequests("cpu", executorCpuQuantity) .endResources() +.addNewEnv() + .withName(ENV_SPARK_USER) --- End diff -- I see that you noted that this is always done across resource managers. What is the reason for that, just wondering? as I introduced it exclusively in the HadoopSteps --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/22911#discussion_r233588339 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DelegationTokenFeatureStep.scala --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder, SecretBuilder} +import org.apache.commons.codec.binary.Base64 +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils, SparkPod} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.config._ + +/** + * Delegation token support for Spark apps on kubernetes. + * + * When preparing driver resources, this step will generate delegation tokens for the app if + * they're needed. + * + * When preparing pods, this step will mount the delegation token secret (either pre-defined, + * or generated by this step when preparing the driver). + */ +private[spark] class DelegationTokenFeatureStep(conf: KubernetesConf[_], isDriver: Boolean) --- End diff -- This is what I meant above when I said that the `HadoopKerberosLogin` logic could be deleted. The assumption here is that the secret should not be created as the keytab will use the HadoopDelegationTokenManager logic. The only secret that should be _created_ would be the keytab. However, I personally thought that we should point to a secretName that is either the delegationToken or the keytab. Hence why I suggested that the secretName and secretItemKey remain. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/22911#discussion_r233542062 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala --- @@ -18,7 +18,30 @@ package org.apache.spark.deploy.k8s import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder} -private[spark] case class SparkPod(pod: Pod, container: Container) +private[spark] case class SparkPod(pod: Pod, container: Container) { + + /** + * Convenience method to apply a series of chained transformations to a pod. + * + * Use it like: + * + * original.modify { case pod => + * // update pod and return new one + * }.modify { case pod => + * // more changes that create a new pod + * }.modify { + * case pod if someCondition => // new pod + * } + * + * This makes it cleaner to apply multiple transformations, avoiding having to create + * a bunch of awkwardly-named local variables. Since the argument is a partial function, + * it can do matching without needing to exhaust all the possibilities. If the function + * is not applied, then the original pod will be kept. + */ + def transform(fn: PartialFunction[SparkPod, SparkPod]): SparkPod = fn.lift(this).getOrElse(this) --- End diff -- I would think that this change is out of the scope of this PR, but I do love the use of a PartialFunction here. Thanks for this! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23017: [WIP][SPARK-26015][K8S] Set a default UID for Spa...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/23017#discussion_r233537461 --- Diff: resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh --- @@ -30,6 +30,10 @@ set -e # If there is no passwd entry for the container UID, attempt to create one if [ -z "$uidentry" ] ; then if [ -w /etc/passwd ] ; then +# TODO Should we allow providing an environment variable to set the desired username? --- End diff -- You can run the user code with a securityContext where you can specify runAs: {UID}, but without root you are unable to run useradd commands which would be crucial for said feature. Kubernetes defaults the security context to be root. Also, is there a security problem with running as root in an isolated container? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22760: [SPARK-25751][K8S][TEST] Unit Testing for Kerberos Suppo...
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/22760 gentle ping @mccheah :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/22911#discussion_r231359962 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala --- @@ -123,7 +126,11 @@ private[spark] class KubernetesClusterSchedulerBackend( } override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = { -new KubernetesDriverEndpoint(rpcEnv, properties) +new KubernetesDriverEndpoint(sc.env.rpcEnv, properties) + } + + override protected def createTokenManager(): Option[HadoopDelegationTokenManager] = { +Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration)) --- End diff -- Yeah, I can always throw up a follow-up for that. No worries --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/22911#discussion_r231344398 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala --- @@ -123,7 +126,11 @@ private[spark] class KubernetesClusterSchedulerBackend( } override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = { -new KubernetesDriverEndpoint(rpcEnv, properties) +new KubernetesDriverEndpoint(sc.env.rpcEnv, properties) + } + + override protected def createTokenManager(): Option[HadoopDelegationTokenManager] = { +Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration)) --- End diff -- Oh, I was referencing the creation of `Delegation Token` secret when a `--keytab` is specified. I believe that you are right in that in client-mode you would not need to worry about running this step. But I think the 3rd option would be good to include here. I think that with the introduction of `HadoopDelegationTokenManager` we should remove the creation of the `dtSecret`, and that should be included in this PR if you are introducing this. Therefore, I think it is sensible to refactor the `KerberosConfigSpec` to have a generic `secret`, `secretName`, `secretKey`, that would either contain a `DelegationToken` or a `keytab`. Such that the code block: ``` private val kerberosConfSpec: Option[KerberosConfigSpec] = (for { secretName <- existingSecretName secretItemKey <- existingSecretItemKey } yield { KerberosConfigSpec( secret = None, secretName = secretName, secretItemKey = secretItemKey, jobUserName = kubeTokenManager.getCurrentUser.getShortUserName) }).orElse( if (isKerberosEnabled) { keytab.map { . } } else { None } ``` would return a kerberosConfSpec that would account for either case. Erm, that would also mean that you could delete the `HadoopKerberosLogin` method. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22760: [SPARK-25751][K8S][TEST] Unit Testing for Kerbero...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/22760#discussion_r230951222 --- Diff: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala --- @@ -63,4 +63,66 @@ object KubernetesFeaturesTestUtils { def containerHasEnvVar(container: Container, envVarName: String): Boolean = { container.getEnv.asScala.exists(envVar => envVar.getName == envVarName) } + + def containerHasEnvVars(container: Container, envs: Map[String, String]): Unit = { +assertHelper[Set[(String, String)]](envs.toSet, + container.getEnv.asScala +.map { e => (e.getName, e.getValue) }.toSet, + subsetOfTup[Set[(String, String)], String], "a subset of") + } + + def containerHasVolumeMounts(container: Container, vms: Map[String, String]): Unit = { +assertHelper[Set[(String, String)]](vms.toSet, + container.getVolumeMounts.asScala +.map { vm => (vm.getName, vm.getMountPath) }.toSet, + subsetOfTup[Set[(String, String)], String], "a subset of") + } + + def podHasLabels(pod: Pod, labels: Map[String, String]): Unit = { +assertHelper[Set[(String, String)]](labels.toSet, pod.getMetadata.getLabels.asScala.toSet, + subsetOfTup[Set[(String, String)], String], "a subset of") + } + + def podHasVolumes(pod: Pod, volumes: Seq[Volume]): Unit = { +assertHelper[Set[Volume]](volumes.toSet, pod.getSpec.getVolumes.asScala.toSet, + subsetOfElem[Set[Volume], Volume], "a subset of") + } + + // Mocking bootstrapHadoopConfDir + def hadoopConfBootPod(inputPod: SparkPod): SparkPod = +SparkPod( + new PodBuilder(inputPod.pod) +.editOrNewMetadata() + .addToLabels("bootstrap-hconf", "true") + .endMetadata() +.build(), + inputPod.container) + + // Mocking bootstrapKerberosPod + def krbBootPod(inputPod: SparkPod): SparkPod = +SparkPod( + new PodBuilder(inputPod.pod) +.editOrNewMetadata() + .addToLabels("bootstrap-kerberos", "true") + .endMetadata() +.build(), + inputPod.container) + + // Mocking bootstrapSparkUserPod + def userBootPod(inputPod: SparkPod): SparkPod = +SparkPod( + new PodBuilder(inputPod.pod) +.editOrNewMetadata() + .addToLabels("bootstrap-user", "true") + .endMetadata() +.build(), + inputPod.container) + + def subsetOfElem[T <: Set[B], B <: Any]: (T, T) => Boolean = (a, b) => a.subsetOf(b) + def subsetOfTup[T <: Set[(B, B)], B <: Any]: (T, T) => Boolean = (a, b) => a.subsetOf(b) + + def assertHelper[T](con1: T, con2: T, + expr: (T, T) => Boolean = (a: T, b: T) => a == b, exprMsg: String = "equal to"): Unit = { +assert(expr(con1, con2), s"$con1 is not $exprMsg $con2 as expected") --- End diff -- I thought it would be better than doing a custom string at every assert statement. I don't find it to be too awkward, but *shrug* :) I actually kind of like it since we could do a check with any expression --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22911: [SPARK-25815][k8s] Support kerberos in client mode, keyt...
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/22911 > But whether to store it in secrets is not a question. You either store it in a secret or you don't support the keytab/principal feature in Spark at all, and we can delete a bunch of code here. But can we, for a specific mode, client or cluster, tune whether the keytab is put into a secret? Right now, if a keytab is specified, regardless, a secret is created. I do wish to support keytab/principal so I think that storage in secrets should be fine. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22911: [SPARK-25815][k8s] Support kerberos in client mod...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/22911#discussion_r230860519 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala --- @@ -123,7 +126,11 @@ private[spark] class KubernetesClusterSchedulerBackend( } override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = { -new KubernetesDriverEndpoint(rpcEnv, properties) +new KubernetesDriverEndpoint(sc.env.rpcEnv, properties) + } + + override protected def createTokenManager(): Option[HadoopDelegationTokenManager] = { +Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration)) --- End diff -- If we are introducing this change, I think it is important that we talk about the future of secret creation upon using `--keytab` + `principle`. Right now, secrets are created when a keytab is used by the client or for client-mode by the driver; this was used primarily for testing (on my end) but also because this logic wasn't previously generalized for all cluster-managers. Should we create an option for the user to create a secret or get rid of it as a whole, as delegation token logic is handled via the UpdateDelegationToken message passing framework. In essence, if we leave the ability to create a secret we are twice obtaining a DT which is extraneous. And if we are removing it, it is sensible to refactor the KerberosConfig logic to account for this removal. I was planning to do this in my token renewal PR where I was also introducing this change, but it seems that this will probably get merged in before mine, as such, here would be a better place to refactor. Or maybe a sepe rate PR that introduces this line and does the refactor, and then this and my PR could be introduced subsequently. thoughts, @vanzin ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22915: [SPARK-25825][K8S][WIP] Enable token renewal for ...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/22915#discussion_r230134430 --- Diff: docs/security.md --- @@ -798,6 +782,50 @@ achieved by setting `spark.kubernetes.hadoop.configMapName` to a pre-existing Co local:///opt/spark/examples/jars/spark-examples_.jar \ ``` + +## Long-Running Applications + +Long-running applications may run into issues if their run time exceeds the maximum delegation +token lifetime configured in services it needs to access. + +Spark supports automatically creating new tokens for these applications when running in YARN, Mesos, and Kubernetes modes. +If one wishes to launch the renewal thread in the Driver, Kerberos credentials need to be provided to the Spark application +via the `spark-submit` command, using the `--principal` and `--keytab` parameters. + +The provided keytab will be copied over to the machine running the Application Master via the Hadoop +Distributed Cache. For this reason, it's strongly recommended that both YARN and HDFS be secured +with encryption, at least. + +The Kerberos login will be periodically renewed using the provided credentials, and new delegation +tokens for supported will be created. + + Long-Running Kerberos in Kubernetes + +This section addresses the additional feature added uniquely to Kubernetes. If you are running an external token service --- End diff -- The problem is that such a service can run in a variety of ways, so I thought it was a matter of defining what the resulting secret would look like. We wrote an example external service in our deprecated-fork to give an example of how such a service would function like: https://github.com/apache-spark-on-k8s/spark/pull/453. In essence, using a service keytab it should aquire delegation tokens bounded to the job-users principle. and place the contents in the secret as a new data-item. For us internally, and other companies running their own unique external renewal services. we might have varying implementations, but I just want to have a well-defined spec of the resulting secret, so I am just experimenting with a WIP spec below. However, it clearly seems necessary to define how such a service should function as well. Would that be sufficient? Sadly, that would still be a bit hand-wavy. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22911: [SPARK-25815][k8s] Support kerberos in client mode, keyt...
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/22911 Just a note, this PR has some overlaps with https://github.com/apache/spark/pull/22915. But just after a brief skim, I am a bit hesitant on storing keytabs in secrets, but it might have to be done for client mode support. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22915: [SPARK-25825][K8S][WIP] Enable token renewal for ...
GitHub user ifilonenko opened a pull request: https://github.com/apache/spark/pull/22915 [SPARK-25825][K8S][WIP] Enable token renewal for both --keytab and tokenSecret ## What changes were proposed in this pull request? Enabled token renewal when specifying `--keytab` or (`spark.kubernetes.kerberos.tokenSecret.renewal` + `spark.kubernetes.kerberos.tokenSecret.name`) for Kerberos on Kubernetes ## How was this patch tested? Unit and Integration tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/ifilonenko/spark SPARK-25825 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22915.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22915 commit 92b61e0841d1d33755fbc63bd28e5ffc1597416c Author: Ilan Filonenko Date: 2018-10-31T20:54:55Z WIP renewal service with specificed contract --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22624: [SPARK-23781][CORE] Merge token renewer functionality in...
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/22624 Thank you for this work @vanzin ! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22805: [SPARK-25809][K8S][TEST] New K8S integration testing bac...
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/22805 @mccheah and @liyinan926 for merge --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22805: [SPARK-25809][K8S][TEST] New K8S integration testing bac...
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/22805 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22805: [SPARK-25809][K8S][TEST] New K8S integration testing bac...
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/22805 test this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22608: [SPARK-25750][K8S][TESTS] Kerberos Support Integr...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/22608#discussion_r229394011 --- Diff: resource-managers/kubernetes/integration-tests/kerberos-yml/data-populator-deployment.yml --- @@ -0,0 +1,49 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +apiVersion: apps/v1 +kind: Deployment --- End diff -- Yeah, but I am aware that it fits better as a `Job`. (The `hdfs -cp` that is being run is the same file, so if it is re-run it doesn't do any harm.) But if it is a preference to use `Job` I don't see any problems with that as it makes sense --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22608: [SPARK-25750][K8S][TESTS] Kerberos Support Integr...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/22608#discussion_r229391389 --- Diff: bin/docker-image-tool.sh --- @@ -53,7 +53,7 @@ function build { # contain a lot of duplicated jars with the main Spark directory. In a proper distribution, # the examples directory is cleaned up before generating the distribution tarball, so this # issue does not occur. -IMG_PATH=resource-managers/kubernetes/docker/src/main/dockerfiles +IMG_PATH=resource-managers/kubernetes/docker/src --- End diff -- The dockerfiles and files for building the kerberos/ hadoop docker images are in `src/test`. It still seemed like a logical place to keep them with the `/test` tag, no? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22608: [SPARK-25750][K8S][TESTS] Kerberos Support Integr...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/22608#discussion_r229386227 --- Diff: resource-managers/kubernetes/integration-tests/kerberos-yml/data-populator-deployment.yml --- @@ -0,0 +1,49 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +apiVersion: apps/v1 +kind: Deployment --- End diff -- This is purely because the job sometimes fails due to networking issues or whatnot and I wanted to counter the flakiness of the test suite by hardening it behind a deployment --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22760: [SPARK-25751][K8S][TEST] Unit Testing for Kerberos Suppo...
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/22760 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22760: [SPARK-25751][K8S][TEST] Unit Testing for Kerberos Suppo...
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/22760 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19045: [WIP][SPARK-20628][CORE][K8S] Keep track of nodes...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/19045#discussion_r229012644 --- Diff: resource-managers/kubernetes/integration-tests/tests/decomissioning_water.py --- @@ -0,0 +1,38 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import print_function + +import sys +import time + +from pyspark.sql import SparkSession + + +if __name__ == "__main__": +""" +Usage: decomissioning_water +""" +spark = SparkSession \ +.builder \ +.appName("PyMemoryTest") \ +.getOrCreate() +sc = spark.SparkContext +rdd = sc.parallelize(range(10)) +rdd.collect() +time.sleep(15) --- End diff -- Personally, might be better to have the `15` be an `args` so that the tester can tune this based on how long they want to sleep. But I am indifferent to this small NIT --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19045: [WIP][SPARK-20628][CORE][K8S] Keep track of nodes...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/19045#discussion_r229011769 --- Diff: resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala --- @@ -242,12 +243,19 @@ private[spark] class KubernetesSuite extends SparkFunSuite action match { case Action.ADDED | Action.MODIFIED => execPods(name) = resource + // If testing decomissioning delete the node 10 seconds after + if (decomissioningTest) { +Thread.sleep(1000) --- End diff -- Why the thread sleep? Why not just check if the `getStatus` is running and then kill? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19045: [WIP][SPARK-20628][CORE][K8S] Keep track of nodes...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/19045#discussion_r229011136 --- Diff: resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._ +import org.apache.spark.deploy.k8s.integrationtest.TestConfig.{getTestImageRepo, getTestImageTag} + +private[spark] trait DecommissionSuite { k8sSuite: KubernetesSuite => --- End diff -- You need to extend `with DecommissionSuite` in `KubernetesSuite.scala` to launch the tests --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22760: [SPARK-25751][K8S][TEST] Unit Testing for Kerberos Suppo...
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/22760 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22608: [SPARK-25750][K8S][TESTS] Kerberos Support Integration T...
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/22608 > Just noticed this, but could you open a separate bug for adding these tests, instead of re-using the one where the main code was added? It's a large enough thing that it should be a separate thing. I had https://issues.apache.org/jira/browse/SPARK-25750 and linked this PR to that JIRA issue. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22820: [SPARK-25828][K8S] Bumping Kubernetes-Client version to ...
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/22820 @felixcheung and @erikerlandson for merge --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22624: [SPARK-23781][CORE] Merge token renewer functiona...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/22624#discussion_r228605725 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala --- @@ -18,45 +18,20 @@ package org.apache.spark.deploy.k8s.security import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.FileSystem -import org.apache.hadoop.security.{Credentials, UserGroupInformation} +import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.SparkConf -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.security.HadoopDelegationTokenManager -import org.apache.spark.internal.Logging /** - * The KubernetesHadoopDelegationTokenManager fetches Hadoop delegation tokens - * on the behalf of the Kubernetes submission client. The new credentials - * (called Tokens when they are serialized) are stored in Secrets accessible - * to the driver and executors, when new Tokens are received they overwrite the current Secrets. + * Adds Kubernetes-specific functionality to HadoopDelegationTokenManager. */ private[spark] class KubernetesHadoopDelegationTokenManager( --- End diff -- Still needed for mock purposes, for unit testing, in the code's current state and will be adapted to override `start()` class in an upcoming PR. So it still remains relevant to keep. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22820: [SPARK-25828][K8S] Bumping Kubernetes-Client version to ...
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/22820 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22760: [SPARK-25751][K8S][TEST] Unit Testing for Kerberos Suppo...
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/22760 moved bootstrap as recommended in comments @mccheah --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22608: [SPARK-25750][K8S][TESTS] Kerberos Support Integration T...
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/22608 > Kubernetes integration test status success This new build passes and because I resolved the docker image, by building the `hadoop-base` image with each iteration, this PR is now ready for review and hopefully merge sn :) @vanzin @liyinan926 @felixcheung @mccheah for review --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22608: [SPARK-25750][K8S][TESTS] Kerberos Support Integration T...
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/22608 @vanzin as per our conversation offline, that is exactly what my current refactor is doing. I will push an update really soon. Thanks for the recommendation! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22820: [SPARK-25828][K8S][BUILD] Bumping Kubernetes-Client vers...
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/22820 In reference to manifests that need to be updated in `dev/deps/spark-deps-hadoop-2.7` and `dev/deps/spark-deps-hadoop-3.1`. This most recent commit should fix --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22820: [SPARK-25828][K8S][BUILD] Bumping Kubernetes-Clie...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/22820#discussion_r228269911 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala --- @@ -157,7 +157,10 @@ private[spark] object KubernetesUtils { }.getOrElse(Seq(("container state", "N/A"))) } - def formatTime(time: Time): String = { -if (time != null) time.getTime else "N/A" + def formatTime(time: String): String = { --- End diff -- The `time.getTime` is now a String. I wanted to keep the functionality, I would remove if you deem it to be unnecessary. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22820: [SPARK-25828][K8S] Bumping Kubernetes-Client vers...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/22820#discussion_r228004310 --- Diff: resource-managers/kubernetes/core/pom.xml --- @@ -29,7 +29,7 @@ Spark Project Kubernetes kubernetes -3.0.0 +4.0.0 --- End diff -- hmm, agreed. Maybe 4.1.0 is a more stable version --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22798: [SPARK-25803][K8S] Fix docker-image-tool.sh -n option
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/22798 test this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22820: [SPARK-25828][K8S] Bumping Kubernetes-Client version to ...
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/22820 @erikerlandson @skonto @liyinan926 for review Do you see additional places where we would want to leverage the newer version for optimality? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22820: [SPARK-25828][K8S] Bumping Kubernetes-Client vers...
GitHub user ifilonenko opened a pull request: https://github.com/apache/spark/pull/22820 [SPARK-25828][K8S] Bumping Kubernetes-Client version to 4.0.0 ## What changes were proposed in this pull request? Changed the `kubernetes-client` version and refactored code that broke as a result ## How was this patch tested? Unit and Integration tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/ifilonenko/spark SPARK-25828 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22820.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22820 commit de62f07bcc33f4d0e995b468c49a8301a99d60d0 Author: Ilan Filonenko Date: 2018-10-25T00:24:16Z bump to 4.0.0 for kubernetes-client version --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22810: [SPARK-24516][K8S] Change Python default to Python3
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/22810 or @holdenk for merge :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22624: [SPARK-23781][CORE] Merge token renewer functiona...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/22624#discussion_r227984918 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala --- @@ -17,76 +17,175 @@ package org.apache.spark.deploy.security +import java.io.File +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.AtomicReference + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem -import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.ui.UIUtils +import org.apache.spark.util.ThreadUtils /** - * Manages all the registered HadoopDelegationTokenProviders and offer APIs for other modules to - * obtain delegation tokens and their renewal time. By default [[HadoopFSDelegationTokenProvider]], - * [[HiveDelegationTokenProvider]] and [[HBaseDelegationTokenProvider]] will be loaded in if not - * explicitly disabled. + * Manager for delegation tokens in a Spark application. + * + * This manager has two modes of operation: + * + * 1. When configured with a principal and a keytab, it will make sure long-running apps can run + * without interruption while accessing secured services. It periodically logs in to the KDC with + * user-provided credentials, and contacts all the configured secure services to obtain delegation + * tokens to be distributed to the rest of the application. + * + * Because the Hadoop UGI API does not expose the TTL of the TGT, a configuration controls how often + * to check that a relogin is necessary. This is done reasonably often since the check is a no-op + * when the relogin is not yet needed. The check period can be overridden in the configuration. * - * Also, each HadoopDelegationTokenProvider is controlled by - * spark.security.credentials.{service}.enabled, and will not be loaded if this config is set to - * false. For example, Hive's delegation token provider [[HiveDelegationTokenProvider]] can be - * enabled/disabled by the configuration spark.security.credentials.hive.enabled. + * New delegation tokens are created once 75% of the renewal interval of the original tokens has + * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM. + * The driver is tasked with distributing the tokens to other processes that might need them. * - * @param sparkConf Spark configuration - * @param hadoopConf Hadoop configuration - * @param fileSystems Delegation tokens will be fetched for these Hadoop filesystems. + * 2. When operating without an explicit principal and keytab, token renewal will not be available. + * Starting the manager will distribute an initial set of delegation tokens to the provided Spark + * driver, but the app will not get new tokens when those expire. + * + * It can also be used just to create delegation tokens, by calling the `obtainDelegationTokens` + * method. This option does not require calling the `start` method, but leaves it up to the + * caller to distribute the tokens that were generated. */ private[spark] class HadoopDelegationTokenManager( -sparkConf: SparkConf, -hadoopConf: Configuration, -fileSystems: Configuration => Set[FileSystem]) - extends Logging { +protected val sparkConf: SparkConf, +protected val hadoopConf: Configuration) extends Logging { private val deprecatedProviderEnabledConfigs = List( "spark.yarn.security.tokens.%s.enabled", "spark.yarn.security.credentials.%s.enabled") private val providerEnabledConfig = "spark.security.credentials.%s.enabled" - // Maintain all the registered delegation token providers - private val delegationTokenProviders = getDelegationTokenProviders + private val principal = sparkConf.get(PRINCIPAL).orNull + private val keytab = sparkConf.get(KEYTAB).orNull + + if (principal != null) { +require(keytab != null, "Kerberos principal specified without a keytab.") +require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.") + } + + private val delegationTokenProviders = loadProviders() logDebug("Us
[GitHub] spark issue #22608: [SPARK-25750][K8S][TESTS] Kerberos Support Integration T...
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/22608 gentle ping for further reviews :) as I would love to have this merged in the Jenkins :) as well as recommendations on how to include the `hadoop-.tgz` in the distribution, for building the `hadoop-base` docker image, as that is crucial for the hadoop cluster. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22810: [SPARK-24516][K8S] Change Python default to Python3
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/22810 @felixcheung for merge and @kokes for opinion and raising this JIRA --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22810: [SPARK-24516][K8S] Change Python default to Pytho...
GitHub user ifilonenko opened a pull request: https://github.com/apache/spark/pull/22810 [SPARK-24516][K8S] Change Python default to Python3 ## What changes were proposed in this pull request? As this is targeted for 3.0.0 and Python2 will be deprecated by Jan 1st, 2020, I feel it is appropriate to change the default to Python3. Especially as these projects [found here](https://python3statement.org/) are deprecating their support. ## How was this patch tested? Unit and Integration tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/ifilonenko/spark SPARK-24516 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22810.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22810 commit ab03cc4753b5628144ef4d772a262b4890ddc36c Author: Ilan Filonenko Date: 2018-10-23T23:24:25Z change default to python3 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22798: [SPARK-25803] Fix docker-image-tool.sh -n option
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/22798 Can you add a [K8S] flag to this PR as this is related to the Kubernetes code. Otherwise, this change looks good to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22805: [WIP][SPARK-25809][K8S][TEST] New K8S integration...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/22805#discussion_r227527156 --- Diff: resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala --- @@ -30,14 +30,16 @@ private[spark] trait IntegrationTestBackend { private[spark] object IntegrationTestBackendFactory { val deployModeConfigKey = "spark.kubernetes.test.deployMode" + val backendMinikube = "minikube" + val backendDockerForDesktop = "docker-for-desktop" + def getTestBackend: IntegrationTestBackend = { val deployMode = Option(System.getProperty(deployModeConfigKey)) - .getOrElse("minikube") -if (deployMode == "minikube") { - MinikubeTestBackend -} else { - throw new IllegalArgumentException( -"Invalid " + deployModeConfigKey + ": " + deployMode) + .getOrElse(backendMinikube) +deployMode match { + case `backendMinikube` => MinikubeTestBackend + case `backendDockerForDesktop` => DockerForDesktopBackend --- End diff -- Can you add a README to explain how to run with docker-for-desktop, by specifying --deploy-mode --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22805: [WIP][SPARK-25809][K8S][TEST] New K8S integration...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/22805#discussion_r227499598 --- Diff: resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/docker/DockerForDesktopBackend.scala --- @@ -0,0 +1,68 @@ +package org.apache.spark.deploy.k8s.integrationtest.backend.docker + +import java.nio.file.Paths + +import io.fabric8.kubernetes.client.{Config, DefaultKubernetesClient} +import org.apache.spark.deploy.k8s.integrationtest.ProcessUtils +import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackend + +private[spark] object DockerForDesktopBackend extends IntegrationTestBackend { + + private val KUBECTL_STARTUP_TIMEOUT_SECONDS = 15 + + private var defaultClient: DefaultKubernetesClient = _ + private var initialContext = "" + + private def getCurrentContext: String = { +val outputs = executeKubectl("config", "current-context") +assert(outputs.size == 1, "Unexpected amount of output from kubectl config current-context") +outputs.head + } + + private def setContext(context: String): Unit = { +val outputs = executeKubectl("config", "use-context", context) +assert(outputs.size == 1, "Unexpected amount of output from kubectl config use-context") +val errors = outputs.filter(_.startsWith("error")) +assert(errors.size == 0, s"Received errors from kubectl: ${errors.head}") + } + + override def initialize(): Unit = { +// Switch context if necessary +// TODO: If we were using Fabric 8 client 3.1.0 then we could +// instead just use the overload of autoConfigure() that takes the +// desired context avoiding the need to interact with kubectl at all +initialContext = getCurrentContext --- End diff -- I would definitely not be opposed. This work will cause a bit of overhead in little refactorings all over the code-base tho, but shouldn't be a problem. I had already gotten started with that, but if you or @skonto want to take over, that would be great. Especially given that this would remove the need to use `kubectl`. +1 on this --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22805: [WIP][SPARK-25809][K8S][TEST] New K8S integration...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/22805#discussion_r227526473 --- Diff: resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala --- @@ -30,14 +30,16 @@ private[spark] trait IntegrationTestBackend { private[spark] object IntegrationTestBackendFactory { val deployModeConfigKey = "spark.kubernetes.test.deployMode" + val backendMinikube = "minikube" + val backendDockerForDesktop = "docker-for-desktop" + def getTestBackend: IntegrationTestBackend = { val deployMode = Option(System.getProperty(deployModeConfigKey)) - .getOrElse("minikube") -if (deployMode == "minikube") { - MinikubeTestBackend -} else { - throw new IllegalArgumentException( -"Invalid " + deployModeConfigKey + ": " + deployMode) + .getOrElse(backendMinikube) +deployMode match { + case `backendMinikube` => MinikubeTestBackend + case `backendDockerForDesktop` => DockerForDesktopBackend + case _ => throw new IllegalArgumentException("Invalid " + deployModeConfigKey + ": " + deployMode) --- End diff -- NIT: line too long, split --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22624: [SPARK-23781][CORE] Merge token renewer functiona...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/22624#discussion_r227075160 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala --- @@ -17,76 +17,175 @@ package org.apache.spark.deploy.security +import java.io.File +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.AtomicReference + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem -import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.ui.UIUtils +import org.apache.spark.util.ThreadUtils /** - * Manages all the registered HadoopDelegationTokenProviders and offer APIs for other modules to - * obtain delegation tokens and their renewal time. By default [[HadoopFSDelegationTokenProvider]], - * [[HiveDelegationTokenProvider]] and [[HBaseDelegationTokenProvider]] will be loaded in if not - * explicitly disabled. + * Manager for delegation tokens in a Spark application. + * + * This manager has two modes of operation: + * + * 1. When configured with a principal and a keytab, it will make sure long-running apps can run + * without interruption while accessing secured services. It periodically logs in to the KDC with + * user-provided credentials, and contacts all the configured secure services to obtain delegation + * tokens to be distributed to the rest of the application. + * + * Because the Hadoop UGI API does not expose the TTL of the TGT, a configuration controls how often + * to check that a relogin is necessary. This is done reasonably often since the check is a no-op + * when the relogin is not yet needed. The check period can be overridden in the configuration. * - * Also, each HadoopDelegationTokenProvider is controlled by - * spark.security.credentials.{service}.enabled, and will not be loaded if this config is set to - * false. For example, Hive's delegation token provider [[HiveDelegationTokenProvider]] can be - * enabled/disabled by the configuration spark.security.credentials.hive.enabled. + * New delegation tokens are created once 75% of the renewal interval of the original tokens has + * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM. + * The driver is tasked with distributing the tokens to other processes that might need them. * - * @param sparkConf Spark configuration - * @param hadoopConf Hadoop configuration - * @param fileSystems Delegation tokens will be fetched for these Hadoop filesystems. + * 2. When operating without an explicit principal and keytab, token renewal will not be available. + * Starting the manager will distribute an initial set of delegation tokens to the provided Spark + * driver, but the app will not get new tokens when those expire. + * + * It can also be used just to create delegation tokens, by calling the `obtainDelegationTokens` + * method. This option does not require calling the `start` method, but leaves it up to the + * caller to distribute the tokens that were generated. */ private[spark] class HadoopDelegationTokenManager( -sparkConf: SparkConf, -hadoopConf: Configuration, -fileSystems: Configuration => Set[FileSystem]) - extends Logging { +protected val sparkConf: SparkConf, +protected val hadoopConf: Configuration) extends Logging { private val deprecatedProviderEnabledConfigs = List( "spark.yarn.security.tokens.%s.enabled", "spark.yarn.security.credentials.%s.enabled") private val providerEnabledConfig = "spark.security.credentials.%s.enabled" - // Maintain all the registered delegation token providers - private val delegationTokenProviders = getDelegationTokenProviders + private val principal = sparkConf.get(PRINCIPAL).orNull + private val keytab = sparkConf.get(KEYTAB).orNull + + if (principal != null) { +require(keytab != null, "Kerberos principal specified without a keytab.") +require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.") + } + + private val delegationTokenProviders = loadProviders() logDebug("Us
[GitHub] spark pull request #22624: [SPARK-23781][CORE] Merge token renewer functiona...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/22624#discussion_r227064934 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala --- @@ -17,76 +17,175 @@ package org.apache.spark.deploy.security +import java.io.File +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.AtomicReference + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem -import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.ui.UIUtils +import org.apache.spark.util.ThreadUtils /** - * Manages all the registered HadoopDelegationTokenProviders and offer APIs for other modules to - * obtain delegation tokens and their renewal time. By default [[HadoopFSDelegationTokenProvider]], - * [[HiveDelegationTokenProvider]] and [[HBaseDelegationTokenProvider]] will be loaded in if not - * explicitly disabled. + * Manager for delegation tokens in a Spark application. + * + * This manager has two modes of operation: + * + * 1. When configured with a principal and a keytab, it will make sure long-running apps can run + * without interruption while accessing secured services. It periodically logs in to the KDC with + * user-provided credentials, and contacts all the configured secure services to obtain delegation + * tokens to be distributed to the rest of the application. + * + * Because the Hadoop UGI API does not expose the TTL of the TGT, a configuration controls how often + * to check that a relogin is necessary. This is done reasonably often since the check is a no-op + * when the relogin is not yet needed. The check period can be overridden in the configuration. * - * Also, each HadoopDelegationTokenProvider is controlled by - * spark.security.credentials.{service}.enabled, and will not be loaded if this config is set to - * false. For example, Hive's delegation token provider [[HiveDelegationTokenProvider]] can be - * enabled/disabled by the configuration spark.security.credentials.hive.enabled. + * New delegation tokens are created once 75% of the renewal interval of the original tokens has + * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM. + * The driver is tasked with distributing the tokens to other processes that might need them. * - * @param sparkConf Spark configuration - * @param hadoopConf Hadoop configuration - * @param fileSystems Delegation tokens will be fetched for these Hadoop filesystems. + * 2. When operating without an explicit principal and keytab, token renewal will not be available. + * Starting the manager will distribute an initial set of delegation tokens to the provided Spark + * driver, but the app will not get new tokens when those expire. + * + * It can also be used just to create delegation tokens, by calling the `obtainDelegationTokens` + * method. This option does not require calling the `start` method, but leaves it up to the + * caller to distribute the tokens that were generated. */ private[spark] class HadoopDelegationTokenManager( -sparkConf: SparkConf, -hadoopConf: Configuration, -fileSystems: Configuration => Set[FileSystem]) - extends Logging { +protected val sparkConf: SparkConf, +protected val hadoopConf: Configuration) extends Logging { private val deprecatedProviderEnabledConfigs = List( "spark.yarn.security.tokens.%s.enabled", "spark.yarn.security.credentials.%s.enabled") private val providerEnabledConfig = "spark.security.credentials.%s.enabled" - // Maintain all the registered delegation token providers - private val delegationTokenProviders = getDelegationTokenProviders + private val principal = sparkConf.get(PRINCIPAL).orNull + private val keytab = sparkConf.get(KEYTAB).orNull + + if (principal != null) { +require(keytab != null, "Kerberos principal specified without a keytab.") +require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.") + } + + private val delegationTokenProviders = loadProviders() logDebug("Us
[GitHub] spark pull request #22624: [SPARK-23781][CORE] Merge token renewer functiona...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/22624#discussion_r227062715 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala --- @@ -17,76 +17,175 @@ package org.apache.spark.deploy.security +import java.io.File +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.AtomicReference + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem -import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.ui.UIUtils +import org.apache.spark.util.ThreadUtils /** - * Manages all the registered HadoopDelegationTokenProviders and offer APIs for other modules to - * obtain delegation tokens and their renewal time. By default [[HadoopFSDelegationTokenProvider]], - * [[HiveDelegationTokenProvider]] and [[HBaseDelegationTokenProvider]] will be loaded in if not - * explicitly disabled. + * Manager for delegation tokens in a Spark application. + * + * This manager has two modes of operation: + * + * 1. When configured with a principal and a keytab, it will make sure long-running apps can run + * without interruption while accessing secured services. It periodically logs in to the KDC with + * user-provided credentials, and contacts all the configured secure services to obtain delegation + * tokens to be distributed to the rest of the application. + * + * Because the Hadoop UGI API does not expose the TTL of the TGT, a configuration controls how often + * to check that a relogin is necessary. This is done reasonably often since the check is a no-op + * when the relogin is not yet needed. The check period can be overridden in the configuration. * - * Also, each HadoopDelegationTokenProvider is controlled by - * spark.security.credentials.{service}.enabled, and will not be loaded if this config is set to - * false. For example, Hive's delegation token provider [[HiveDelegationTokenProvider]] can be - * enabled/disabled by the configuration spark.security.credentials.hive.enabled. + * New delegation tokens are created once 75% of the renewal interval of the original tokens has + * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM. + * The driver is tasked with distributing the tokens to other processes that might need them. * - * @param sparkConf Spark configuration - * @param hadoopConf Hadoop configuration - * @param fileSystems Delegation tokens will be fetched for these Hadoop filesystems. + * 2. When operating without an explicit principal and keytab, token renewal will not be available. + * Starting the manager will distribute an initial set of delegation tokens to the provided Spark + * driver, but the app will not get new tokens when those expire. + * + * It can also be used just to create delegation tokens, by calling the `obtainDelegationTokens` + * method. This option does not require calling the `start` method, but leaves it up to the + * caller to distribute the tokens that were generated. */ private[spark] class HadoopDelegationTokenManager( -sparkConf: SparkConf, -hadoopConf: Configuration, -fileSystems: Configuration => Set[FileSystem]) - extends Logging { +protected val sparkConf: SparkConf, +protected val hadoopConf: Configuration) extends Logging { private val deprecatedProviderEnabledConfigs = List( "spark.yarn.security.tokens.%s.enabled", "spark.yarn.security.credentials.%s.enabled") private val providerEnabledConfig = "spark.security.credentials.%s.enabled" - // Maintain all the registered delegation token providers - private val delegationTokenProviders = getDelegationTokenProviders + private val principal = sparkConf.get(PRINCIPAL).orNull + private val keytab = sparkConf.get(KEYTAB).orNull + + if (principal != null) { +require(keytab != null, "Kerberos principal specified without a keytab.") +require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.") + } + + private val delegationTokenProviders = loadProviders() logDebug("Us
[GitHub] spark pull request #22639: [SPARK-25647][k8s] Add spark streaming compatibil...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/22639#discussion_r227060819 --- Diff: resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/StreamingCompatibilitySuite.scala --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.k8s.integrationtest + +import java.net._ + +import scala.collection.JavaConverters._ +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future + +import org.scalatest.concurrent.{Eventually, PatienceConfiguration} +import org.scalatest.time.{Minutes, Span} + +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._ +import org.apache.spark.deploy.k8s.integrationtest.TestConstants._ +import org.apache.spark.internal.Logging +import org.apache.spark.util + +private[spark] trait StreamingCompatibilitySuite { + + k8sSuite: KubernetesSuite => + + import StreamingCompatibilitySuite._ + + test("Run spark streaming in client mode.", k8sTestTag) { +val (host, port, serverSocket) = startSocketServer() +val driverService = driverServiceSetup +try { + setupSparkStreamingPod(driverService.getMetadata.getName) +.addToArgs("streaming.NetworkWordCount") +.addToArgs(host, port.toString) +.endContainer() +.endSpec() +.done() + Eventually.eventually(TIMEOUT, INTERVAL) { +assert(getRunLog.contains("spark-streaming-kube"), "The application did not complete.") + } +} finally { + // Have to delete the service manually since it doesn't have an owner reference + kubernetesTestComponents +.kubernetesClient +.services() +.inNamespace(kubernetesTestComponents.namespace) +.delete(driverService) + serverSocket.close() +} + } + + test("Run spark streaming in cluster mode.", k8sTestTag) { +val (host, port, serverSocket) = startSocketServer() +try { + runSparkJVMCheckAndVerifyCompletion( +mainClass = "org.apache.spark.examples.streaming.NetworkWordCount", +appArgs = Array[String](host, port.toString), +expectedJVMValue = Seq("spark-streaming-kube")) +} finally { + serverSocket.close() +} + } + + test("Run spark structured streaming in cluster mode.", k8sTestTag) { +val (host, port, serverSocket) = startSocketServer() +try { + runSparkJVMCheckAndVerifyCompletion( +mainClass = "org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount", +appArgs = Array[String](host, port.toString), +expectedJVMValue = Seq("spark-streaming-kube")) +} finally { + serverSocket.close() +} + } + + test("Run spark structured streaming in client mode.", k8sTestTag) { +val (host, port, serverSocket) = startSocketServer() +val driverService = driverServiceSetup +try { + setupSparkStreamingPod(driverService.getMetadata.getName) +.addToArgs("sql.streaming.StructuredNetworkWordCount") +.addToArgs(host, port.toString) +.endContainer() +.endSpec() +.done() + + val TIMEOUT = PatienceConfiguration.Timeout(Span(3, Minutes)) + Eventually.eventually(TIMEOUT, INTERVAL) { +assert(getRunLog.contains("spark-streaming-kube"), + "The application did not complete.") + } +} +finally { + // Have to delete the service manually since it doesn't have an owner reference + kubernetesTestComponents +.kubernetesClient +.services()
[GitHub] spark pull request #22777: [SPARK-25299][K8S][WIP] Enabling Remote Shuffle B...
Github user ifilonenko closed the pull request at: https://github.com/apache/spark/pull/22777 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22777: [SPARK-25299][K8S][WIP] Enabling Remote Shuffle B...
GitHub user ifilonenko opened a pull request: https://github.com/apache/spark/pull/22777 [SPARK-25299][K8S][WIP] Enabling Remote Shuffle Backup ## What changes were proposed in this pull request? TODO ## How was this patch tested? TODO You can merge this pull request into a Git repository by running: $ git pull https://github.com/ifilonenko/spark SPARK-25299-experimental Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22777.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22777 commit 73d3ac7d0dd876d436aa5e6723dbd75952f5a93c Author: mcheah Date: 2018-10-02T23:21:59Z Initial experiment for backing up shuffle files. Instead of expecting the shuffle service to just pick up the data written to local disk, executors can upload the data explicitly to the shuffle service. Shuffle services do not need to be colocated with executors this way. This can extend to arbitrary numbers of replicas. commit 8281e1b7d736a72d1e3bbb9114136380bf5c4488 Author: mcheah Date: 2018-10-10T16:54:14Z merge conflicts commit 1aa6d656e0e38099e576f8de4550c0d8fcace4ca Author: Ilan Filonenko Date: 2018-10-19T22:44:25Z fixing issue of client connections and refactoring for heartbeat --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22748: [SPARK-25745][K8S] Improve docker-image-tool.sh script
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/22748 @vanzin I think it is related to having access to trigger the k8s builds (I or @shaneknapp ) can add you if you would like --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22748: [SPARK-25745][K8S] Improve docker-image-tool.sh script
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/22748 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22704: [SPARK-25681][K8S][WIP] Leverage a config to tune...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/22704#discussion_r226127311 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala --- @@ -49,8 +49,11 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration val fetchCreds = fetchDelegationTokens(getTokenRenewer(hadoopConf), fsToGetTokens, creds) // Get the token renewal interval if it is not set. It will only be called once. -if (tokenRenewalInterval == null) { - tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, sparkConf, fsToGetTokens) +// If running a Kerberos job on Kubernetes, you may specify that you wish to not +// obtain the tokenRenewal interval, as the renewal service may be external. --- End diff -- > And BTW, I know what you mean when you mention an external renewal service. But again, that does not exist, and until it does, you should not do things that assume its existence. Right, so in Spark there is no token renewal service ATM, but the existence of an external token service that places tokens into Secrets may exist within a company organization, no? Whether they leverage one, provided by Spark or not. So I personally thought that such a comment is sensible. But I'll remove it based on your reasoning. > If you're in this code, there are two options: Ah, very good point, checking for the presence of a keytab / principal as a flag, given that > It has always been, and always will be, the way to tell Spark that you want Spark to renew tokens itself makes sense > The current k8s backend is broken in that regard. The design was specifically for the popular use-case in which for cluster-mode we would not send keytabs around and instead read the DT from a secret. So true, it does break the traditional design because we are using a keytab and not enabling renewal. Contractually with your work, next steps would be to parallel the other resource-managers in allowing for the option to use the renewal code if the keytab and principal is already in the Driver. Just for interest, has there been any cases where the Driver is over-worked in running this renewal service and managing the DTs for many executors? In essence, could there be any convincing use-case to have the Driver use the keytab for login, but not want to do its own renewal as it might be the case that it can't handle the load? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22760: [SPARK-25751][K8S][TEST] Unit Testing for Kerberos Suppo...
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/22760 @liyinan926 @mccheah for review on unit testing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22760: [SPARK-25751][K8S][TEST] Unit Testing for Kerbero...
GitHub user ifilonenko opened a pull request: https://github.com/apache/spark/pull/22760 [SPARK-25751][K8S][TEST] Unit Testing for Kerberos Support ## What changes were proposed in this pull request? Unit tests for Kerberos support addition ## How was this patch tested? Unit and Integration tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/ifilonenko/spark SPARK-25751 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22760.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22760 commit 47bd9071f88199e4c73b43227626108481d72595 Author: Ilan Filonenko Date: 2018-10-17T22:12:51Z unit tests for all features --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22748: [SPARK-25745][K8S] Improve docker-image-tool.sh script
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/22748 There seems to be overlapping logic between this PR and https://github.com/apache/spark/pull/22681 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22146: [SPARK-24434][K8S] pod template files
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/22146 Is there any progress on this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22608: [SPARK-25750][K8S][TESTS] Kerberos Support Integr...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/22608#discussion_r225714964 --- Diff: bin/docker-image-tool.sh --- @@ -71,18 +71,29 @@ function build { --build-arg base_img=$(image_ref spark) ) - local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/spark/Dockerfile"} - local PYDOCKERFILE=${PYDOCKERFILE:-"$IMG_PATH/spark/bindings/python/Dockerfile"} - local RDOCKERFILE=${RDOCKERFILE:-"$IMG_PATH/spark/bindings/R/Dockerfile"} + local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/main/dockerfiles/spark/Dockerfile"} + local PYDOCKERFILE=${PYDOCKERFILE:-"$IMG_PATH/main/dockerfiles/spark/bindings/python/Dockerfile"} + local KDOCKERFILE=${KDOCKERFILE:-"$IMG_PATH/test/dockerfiles/spark/kerberos/Dockerfile"} --- End diff -- > Yes, but I don't think it's meant for also building images for tests. I don't see why not. A simple flag would toggle it's build.,would that suffice? Having it be all in this script is cleaner given that the `image_ref` is defined in `docker-image-tool.sh` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22639: [SPARK-25647][k8s] Add spark streaming compatibil...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/22639#discussion_r225686232 --- Diff: resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/StreamingCompatibilitySuite.scala --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.k8s.integrationtest + +import java.net._ + +import scala.collection.JavaConverters._ +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future + +import org.scalatest.concurrent.{Eventually, PatienceConfiguration} +import org.scalatest.time.{Minutes, Span} + +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._ +import org.apache.spark.deploy.k8s.integrationtest.TestConstants._ +import org.apache.spark.internal.Logging +import org.apache.spark.util + +private[spark] trait StreamingCompatibilitySuite { + + k8sSuite: KubernetesSuite => + + import StreamingCompatibilitySuite._ + + test("Run spark streaming in client mode.", k8sTestTag) { +val (host, port, serverSocket) = startSocketServer() +val driverService = driverServiceSetup +try { + setupSparkStreamingPod(driverService.getMetadata.getName) +.addToArgs("streaming.NetworkWordCount") +.addToArgs(host, port.toString) +.endContainer() +.endSpec() +.done() + Eventually.eventually(TIMEOUT, INTERVAL) { +assert(getRunLog.contains("spark-streaming-kube"), "The application did not complete.") + } +} finally { + // Have to delete the service manually since it doesn't have an owner reference + kubernetesTestComponents +.kubernetesClient +.services() +.inNamespace(kubernetesTestComponents.namespace) +.delete(driverService) + serverSocket.close() +} + } + + test("Run spark streaming in cluster mode.", k8sTestTag) { +val (host, port, serverSocket) = startSocketServer() +try { + runSparkJVMCheckAndVerifyCompletion( +mainClass = "org.apache.spark.examples.streaming.NetworkWordCount", +appArgs = Array[String](host, port.toString), +expectedJVMValue = Seq("spark-streaming-kube")) +} finally { + serverSocket.close() +} + } + + test("Run spark structured streaming in cluster mode.", k8sTestTag) { +val (host, port, serverSocket) = startSocketServer() +try { + runSparkJVMCheckAndVerifyCompletion( +mainClass = "org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount", +appArgs = Array[String](host, port.toString), +expectedJVMValue = Seq("spark-streaming-kube")) +} finally { + serverSocket.close() +} + } + + test("Run spark structured streaming in client mode.", k8sTestTag) { +val (host, port, serverSocket) = startSocketServer() +val driverService = driverServiceSetup +try { + setupSparkStreamingPod(driverService.getMetadata.getName) +.addToArgs("sql.streaming.StructuredNetworkWordCount") +.addToArgs(host, port.toString) +.endContainer() +.endSpec() +.done() + + val TIMEOUT = PatienceConfiguration.Timeout(Span(3, Minutes)) + Eventually.eventually(TIMEOUT, INTERVAL) { +assert(getRunLog.contains("spark-streaming-kube"), + "The application did not complete.") + } +} +finally { + // Have to delete the service manually since it doesn't have an owner reference + kubernetesTestComponents +.kubernetesClient +.services()
[GitHub] spark issue #22722: [SPARK-24432][k8s] Add support for dynamic resource allo...
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/22722 @huafengw Right now I am testing and further developing the experimental design (Option No. 5) that was proposed in https://issues.apache.org/jira/browse/SPARK-25299, building off of the work done by @mccheah. This is targetted for 3.0. I will throw up a WIP soon, which I would love your input on, once we convince the community on the design proposed in the WIP. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22608: [SPARK-23257][K8S][TESTS] Kerberos Support Integr...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/22608#discussion_r225631577 --- Diff: bin/docker-image-tool.sh --- @@ -71,18 +71,29 @@ function build { --build-arg base_img=$(image_ref spark) ) - local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/spark/Dockerfile"} - local PYDOCKERFILE=${PYDOCKERFILE:-"$IMG_PATH/spark/bindings/python/Dockerfile"} - local RDOCKERFILE=${RDOCKERFILE:-"$IMG_PATH/spark/bindings/R/Dockerfile"} + local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/main/dockerfiles/spark/Dockerfile"} + local PYDOCKERFILE=${PYDOCKERFILE:-"$IMG_PATH/main/dockerfiles/spark/bindings/python/Dockerfile"} + local KDOCKERFILE=${KDOCKERFILE:-"$IMG_PATH/test/dockerfiles/spark/kerberos/Dockerfile"} --- End diff -- It is a docker image builder, so it seems to be an appropriate place. Especially, since itâs the first stage in our integration tests. But am open to opinions. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22608: [SPARK-23257][K8S][TESTS] Kerberos Support Integr...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/22608#discussion_r225630500 --- Diff: bin/docker-image-tool.sh --- @@ -71,18 +71,29 @@ function build { --build-arg base_img=$(image_ref spark) ) - local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/spark/Dockerfile"} - local PYDOCKERFILE=${PYDOCKERFILE:-"$IMG_PATH/spark/bindings/python/Dockerfile"} - local RDOCKERFILE=${RDOCKERFILE:-"$IMG_PATH/spark/bindings/R/Dockerfile"} + local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/main/dockerfiles/spark/Dockerfile"} + local PYDOCKERFILE=${PYDOCKERFILE:-"$IMG_PATH/main/dockerfiles/spark/bindings/python/Dockerfile"} + local KDOCKERFILE=${KDOCKERFILE:-"$IMG_PATH/test/dockerfiles/spark/kerberos/Dockerfile"} + local RDOCKERFILE=${RDOCKERFILE:-"$IMG_PATH/main/dockerfiles/spark/bindings/R/Dockerfile"} + # Spark Base docker build $NOCACHEARG "${BUILD_ARGS[@]}" \ -t $(image_ref spark) \ -f "$BASEDOCKERFILE" . + # PySpark docker build $NOCACHEARG "${BINDING_BUILD_ARGS[@]}" \ -t $(image_ref spark-py) \ -f "$PYDOCKERFILE" . + # The following are optional docker builds for Kerberos Testing + docker pull ifilonenko/hadoop-base:latest --- End diff -- The strategy of using this âpersonalâ image is purely because the HDFS nodes: (kdc, nn, dn) themselves require hadoop-2.7.3.tgz. Unless that can be packaged in the distribution, in which case it would be trivial to build, pulling is easier. I am open for opinions on building strategies. The dockerfile in question can be found here: https://github.com/ifilonenko/hadoop-kerberos-helm/blob/master/Dockerfile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22704: [SPARK-25681][K8S][WIP] Leverage a config to tune renewa...
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/22704 @vanzin and @jerryshao for opinions on approach. The classes are quite difficult to unit-test with mocking libraries, maybe a refactor is necessary, but was wondering if the overall approach would still work with Yarn, and whether the introduction of a K8S specific config here, is okay. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22608: [SPARK-23257][K8S][TESTS] Kerberos Support Integration T...
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/22608 @mccheah and @erikerlandson for review as the `Secure HDFS test with HDFS keytab (Cluster Mode)` passes and should be merged to enable Secure HDFS interaction. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22608: [SPARK-23257][K8S][TESTS] Kerberos Support Integration T...
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/22608 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19045: [WIP][SPARK-20628][CORE] Keep track of nodes (/ spot ins...
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/19045 please rename PR with [K8S] flag to launch tests. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19045: [WIP][SPARK-20628][CORE] Keep track of nodes (/ spot ins...
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/19045 test this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22704: [SPARK-25681][K8S][WIP] Leverage a config to tune...
GitHub user ifilonenko opened a pull request: https://github.com/apache/spark/pull/22704 [SPARK-25681][K8S][WIP] Leverage a config to tune renewal time retrieval ## What changes were proposed in this pull request? Changes to core allow for a K8S to pass a SparkConf specifying whether the `obtainDelegationTokens()` logic fetches `renewalInterval` (as in some uses cases where the DT renewal may be external to Spark i.e. K8s Cluster Mode) and whether this renewal interval calculation is done by retrieving a second DT, as YARN does. ## How was this patch tested? - [ ] Unit Tests - [ ] Integration Tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/ifilonenko/spark SPARK-25681 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22704.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22704 commit cba8abbaadb44ff064d348324da7922e514b9387 Author: Ilan Filonenko Date: 2018-10-11T22:26:39Z first WIP commit commit 6e807e169cc9113c5fcd1653e610ec473c1ff8e8 Author: Ilan Filonenko Date: 2018-10-11T22:30:06Z modified sentence --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21669: [SPARK-23257][K8S] Kerberos Support for Spark on ...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/21669#discussion_r224250496 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopKerberosLogin.scala --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features.hadooputils + +import io.fabric8.kubernetes.api.model.SecretBuilder +import org.apache.commons.codec.binary.Base64 + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.security.KubernetesHadoopDelegationTokenManager + + /** --- End diff -- I apologize, these slip through the cracks. Definitely gonna add an additional linter / style-checker as the Jenkin's scalastyle checks seem to be quite primitive and does not complain. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21669: [SPARK-23257][K8S] Kerberos Support for Spark on ...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/21669#discussion_r223885097 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import io.fabric8.kubernetes.api.model.HasMetadata + +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils, SparkPod} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesDriverSpecificConf +import org.apache.spark.deploy.k8s.features.hadooputils._ +import org.apache.spark.internal.Logging + + /** + * Runs the necessary Hadoop-based logic based on Kerberos configs and the presence of the + * HADOOP_CONF_DIR. This runs various bootstrap methods defined in HadoopBootstrapUtil. + */ +private[spark] class KerberosConfDriverFeatureStep( + kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]) + extends KubernetesFeatureConfigStep with Logging { + + require(kubernetesConf.hadoopConfSpec.isDefined, + "Ensure that HADOOP_CONF_DIR is defined either via env or a pre-defined ConfigMap") + private val hadoopConfDirSpec = kubernetesConf.hadoopConfSpec.get + private val conf = kubernetesConf.sparkConf + private val principal = conf.get(org.apache.spark.internal.config.PRINCIPAL) + private val keytab = conf.get(org.apache.spark.internal.config.KEYTAB) + private val existingSecretName = conf.get(KUBERNETES_KERBEROS_DT_SECRET_NAME) + private val existingSecretItemKey = conf.get(KUBERNETES_KERBEROS_DT_SECRET_ITEM_KEY) + private val krb5File = conf.get(KUBERNETES_KERBEROS_KRB5_FILE) + private val krb5CMap = conf.get(KUBERNETES_KERBEROS_KRB5_CONFIG_MAP) + private val kubeTokenManager = kubernetesConf.tokenManager(conf, + SparkHadoopUtil.get.newConfiguration(conf)) + private val isKerberosEnabled = + (hadoopConfDirSpec.hadoopConfDir.isDefined && kubeTokenManager.isSecurityEnabled) || + (hadoopConfDirSpec.hadoopConfigMapName.isDefined && + (krb5File.isDefined || krb5CMap.isDefined)) + + require(keytab.isEmpty || isKerberosEnabled, + "You must enable Kerberos support if you are specifying a Kerberos Keytab") + + require(existingSecretName.isEmpty || isKerberosEnabled, + "You must enable Kerberos support if you are specifying a Kerberos Secret") + + require((krb5File.isEmpty || krb5CMap.isEmpty) || isKerberosEnabled, + "You must specify either a krb5 file location or a ConfigMap with a krb5 file") + + KubernetesUtils.requireNandDefined( + krb5File, + krb5CMap, + "Do not specify both a Krb5 local file and the ConfigMap as the creation " + + "of an additional ConfigMap, when one is already specified, is extraneous") + + KubernetesUtils.requireBothOrNeitherDefined( + keytab, + principal, + "If a Kerberos principal is specified you must also specify a Kerberos keytab", + "If a Kerberos keytab is specified you must also specify a Kerberos principal") + + KubernetesUtils.requireBothOrNeitherDefined( + existingSecretName, + existingSecretItemKey, + "If a secret data item-key where the data of the Kerberos Delegation Token is specified" + +" you must also specify the name of the secret", + "If a secret storing a Kerberos Delegation Token is specified you must also" + +" specify the item-key where the data is stored") + + private val hadoopConfigurationFiles = hadoopConfDirSpec.had
[GitHub] spark issue #21669: [SPARK-23257][K8S] Kerberos Support for Spark on K8S
Github user ifilonenko commented on the issue: https://github.com/apache/spark/pull/21669 thank you @liyinan926 for your review! @vanzin, ill leave it to you to give the final LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21669: [SPARK-23257][K8S] Kerberos Support for Spark on ...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/21669#discussion_r223535529 --- Diff: docs/running-on-kubernetes.md --- @@ -820,4 +820,37 @@ specific to Spark on Kubernetes. This sets the major Python version of the docker image used to run the driver and executor containers. Can either be 2 or 3. + + spark.kubernetes.kerberos.krb5.location + (none) + + Specify the local location of the krb5 file to be mounted on the driver and executors for Kerberos interaction. + It is important to note that for local files, the KDC defined needs to be visible from inside the containers. --- End diff -- True --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21669: [SPARK-23257][K8S] Kerberos Support for Spark on ...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/21669#discussion_r223525420 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopKerberosLogin.scala --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features.hadooputils + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.SecretBuilder +import org.apache.commons.codec.binary.Base64 + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.security.KubernetesHadoopDelegationTokenManager + + /** + * This logic does all the heavy lifting for Delegation Token creation. This step + * assumes that the job user has either specified a principal and keytab or ran + * $kinit before running spark-submit. By running UGI.getCurrentUser we are able + * to obtain the current user, either signed in via $kinit or keytab. With the + * Job User principal you then retrieve the delegation token from the NameNode + * and store values in DelegationToken. Lastly, the class puts the data into + * a secret. All this is defined in a KerberosConfigSpec. + */ +private[spark] object HadoopKerberosLogin { + def buildSpec( + submissionSparkConf: SparkConf, + kubernetesResourceNamePrefix : String, + tokenManager: KubernetesHadoopDelegationTokenManager): KerberosConfigSpec = { + val hadoopConf = SparkHadoopUtil.get.newConfiguration(submissionSparkConf) + if (!tokenManager.isSecurityEnabled) { + throw new SparkException("Hadoop not configured with Kerberos") + } + // The JobUserUGI will be taken fom the Local Ticket Cache or via keytab+principal + // The login happens in the SparkSubmit so login logic is not necessary to include + val jobUserUGI = tokenManager.getCurrentUser + val originalCredentials = jobUserUGI.getCredentials + val (tokenData, renewalInterval) = tokenManager.getDelegationTokens( + originalCredentials, + submissionSparkConf, + hadoopConf) + require(tokenData.nonEmpty, "Did not obtain any delegation tokens") + val currentTime = tokenManager.getCurrentTime + val initialTokenDataKeyName = s"$KERBEROS_SECRET_LABEL_PREFIX-$currentTime-$renewalInterval" --- End diff -- > why this config name needs to reference those values? Because, either way, an external renewal service will always need to know the renewal interval and current time to calculate when to do renewal. > I'm sure we'll have discussions about how to make requests to the service and propagate any needed configuration. Agreed, that will be the case in followup PRs --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21669: [SPARK-23257][K8S] Kerberos Support for Spark on ...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/21669#discussion_r223524110 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopKerberosLogin.scala --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features.hadooputils + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.SecretBuilder +import org.apache.commons.codec.binary.Base64 + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.security.KubernetesHadoopDelegationTokenManager + + /** + * This logic does all the heavy lifting for Delegation Token creation. This step + * assumes that the job user has either specified a principal and keytab or ran + * $kinit before running spark-submit. By running UGI.getCurrentUser we are able + * to obtain the current user, either signed in via $kinit or keytab. With the + * Job User principal you then retrieve the delegation token from the NameNode + * and store values in DelegationToken. Lastly, the class puts the data into + * a secret. All this is defined in a KerberosConfigSpec. + */ +private[spark] object HadoopKerberosLogin { + def buildSpec( + submissionSparkConf: SparkConf, + kubernetesResourceNamePrefix : String, + tokenManager: KubernetesHadoopDelegationTokenManager): KerberosConfigSpec = { + val hadoopConf = SparkHadoopUtil.get.newConfiguration(submissionSparkConf) + if (!tokenManager.isSecurityEnabled) { + throw new SparkException("Hadoop not configured with Kerberos") + } + // The JobUserUGI will be taken fom the Local Ticket Cache or via keytab+principal + // The login happens in the SparkSubmit so login logic is not necessary to include + val jobUserUGI = tokenManager.getCurrentUser + val originalCredentials = jobUserUGI.getCredentials + val (tokenData, renewalInterval) = tokenManager.getDelegationTokens( + originalCredentials, + submissionSparkConf, + hadoopConf) + require(tokenData.nonEmpty, "Did not obtain any delegation tokens") + val currentTime = tokenManager.getCurrentTime + val initialTokenDataKeyName = s"$KERBEROS_SECRET_LABEL_PREFIX-$currentTime-$renewalInterval" --- End diff -- We built a renewal service (a micro-service designed based on the design doc linked in the description), that used the `currentTime` and `renewalInteveral` to know when to update the secrets. It determined whether or not to "renew" secrets by using the `refresh-token` label. This was the first step in the organization of a separate renewal service, however in our company (and other companys' use cases) these renewal services should be arbitrary and pluggable. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21669: [SPARK-23257][K8S] Kerberos Support for Spark on ...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/21669#discussion_r223523341 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala --- @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import scala.collection.JavaConverters._ + +import com.google.common.base.Charsets +import com.google.common.io.Files +import io.fabric8.kubernetes.api.model.{ConfigMapBuilder, HasMetadata} + +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils, SparkPod} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesDriverSpecificConf +import org.apache.spark.deploy.k8s.features.hadooputils._ +import org.apache.spark.internal.Logging + + /** + * Runs the necessary Hadoop-based logic based on Kerberos configs and the presence of the + * HADOOP_CONF_DIR. This runs various bootstrap methods defined in HadoopBootstrapUtil. + */ +private[spark] class KerberosConfDriverFeatureStep( +kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]) +extends KubernetesFeatureConfigStep with Logging { + +private val conf = kubernetesConf.sparkConf +private val maybePrincipal = conf.get(org.apache.spark.internal.config.PRINCIPAL) +private val maybeKeytab = conf.get(org.apache.spark.internal.config.KEYTAB) +private val maybeExistingSecretName = conf.get(KUBERNETES_KERBEROS_DT_SECRET_NAME) +private val maybeExistingSecretItemKey = + conf.get(KUBERNETES_KERBEROS_DT_SECRET_ITEM_KEY) +private val maybeKrb5File = + conf.get(KUBERNETES_KERBEROS_KRB5_FILE) +private val maybeKrb5CMap = + conf.get(KUBERNETES_KERBEROS_KRB5_CONFIG_MAP) +private val kubeTokenManager = kubernetesConf.tokenManager(conf, + SparkHadoopUtil.get.newConfiguration(conf)) +private val isKerberosEnabled = kubeTokenManager.isSecurityEnabled + +require(maybeKeytab.isEmpty || isKerberosEnabled, + "You must enable Kerberos support if you are specifying a Kerberos Keytab") + +require(maybeExistingSecretName.isEmpty || isKerberosEnabled, + "You must enable Kerberos support if you are specifying a Kerberos Secret") + + require((maybeKrb5File.isEmpty || maybeKrb5CMap.isEmpty) || isKerberosEnabled, +"You must specify either a krb5 file location or a ConfigMap with a krb5 file") + + KubernetesUtils.requireNandDefined( + maybeKrb5File, + maybeKrb5CMap, + "Do not specify both a Krb5 local file and the ConfigMap as the creation " + + "of an additional ConfigMap, when one is already specified, is extraneous") + +KubernetesUtils.requireBothOrNeitherDefined( + maybeKeytab, + maybePrincipal, + "If a Kerberos principal is specified you must also specify a Kerberos keytab", + "If a Kerberos keytab is specified you must also specify a Kerberos principal") + +KubernetesUtils.requireBothOrNeitherDefined( + maybeExistingSecretName, + maybeExistingSecretItemKey, + "If a secret data item-key where the data of the Kerberos Delegation Token is specified" + +" you must also specify the name of the secret", + "If a secret storing a Kerberos Delegation Token is specified you must also" + +" specify the item-key where the data is stored") + +require(kubernetesConf.hadoopConfDir.isDefined, "Ensure that HADOOP_CONF_DIR is defined") +private val hadoopConfDir = kubernetesConf.hadoopConfDir.get +private val hadoopConfigurationFiles
[GitHub] spark pull request #21669: [SPARK-23257][K8S] Kerberos Support for Spark on ...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/21669#discussion_r223492379 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import io.fabric8.kubernetes.api.model.HasMetadata + +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.features.hadooputils.HadoopBootstrapUtil +import org.apache.spark.internal.Logging + + /** + * This step is responsible for bootstraping the container with ConfigMaps + * containing Hadoop config files mounted as volumes and an ENV variable + * pointed to the mounted file directory. + */ +private[spark] class HadoopConfExecutorFeatureStep( + kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]) + extends KubernetesFeatureConfigStep with Logging{ + + override def configurePod(pod: SparkPod): SparkPod = { + val sparkConf = kubernetesConf.sparkConf + val maybeHadoopConfDir = sparkConf.getOption(HADOOP_CONF_DIR_LOC) + val maybeHadoopConfigMap = sparkConf.getOption(HADOOP_CONFIG_MAP_NAME) + require(maybeHadoopConfDir.isDefined && maybeHadoopConfigMap.isDefined, --- End diff -- ATM, it is just HADOOP_CONF_DIR; the ability to use an existing ConfigMap is an additional feature that will only be relevant with the pod-template feature, which is still unmerged. As such, I didn't include that check yet. It was just a mentioned in the docs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21669: [SPARK-23257][K8S] Kerberos Support for Spark on ...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/21669#discussion_r223483718 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import io.fabric8.kubernetes.api.model.HasMetadata + +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.features.hadooputils.HadoopBootstrapUtil +import org.apache.spark.internal.Logging + + /** + * This step is responsible for bootstraping the container with ConfigMaps + * containing Hadoop config files mounted as volumes and an ENV variable + * pointed to the mounted file directory. + */ +private[spark] class HadoopConfExecutorFeatureStep( + kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]) + extends KubernetesFeatureConfigStep with Logging{ + + override def configurePod(pod: SparkPod): SparkPod = { + val sparkConf = kubernetesConf.sparkConf + val maybeHadoopConfDir = sparkConf.getOption(HADOOP_CONF_DIR_LOC) + val maybeHadoopConfigMap = sparkConf.getOption(HADOOP_CONFIG_MAP_NAME) + require(maybeHadoopConfDir.isDefined && maybeHadoopConfigMap.isDefined, --- End diff -- `HADOOP_CONF_DIR_LOC -> kubernetesConf.hadoopConfDir.get` so it is by extension. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21669: [SPARK-23257][K8S] Kerberos Support for Spark on ...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/21669#discussion_r223481427 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala --- @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import scala.collection.JavaConverters._ + +import com.google.common.base.Charsets +import com.google.common.io.Files +import io.fabric8.kubernetes.api.model.{ConfigMapBuilder, HasMetadata} + +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils, SparkPod} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesDriverSpecificConf +import org.apache.spark.deploy.k8s.features.hadooputils._ +import org.apache.spark.internal.Logging + + /** + * Runs the necessary Hadoop-based logic based on Kerberos configs and the presence of the + * HADOOP_CONF_DIR. This runs various bootstrap methods defined in HadoopBootstrapUtil. + */ +private[spark] class KerberosConfDriverFeatureStep( +kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]) +extends KubernetesFeatureConfigStep with Logging { + +private val conf = kubernetesConf.sparkConf +private val maybePrincipal = conf.get(org.apache.spark.internal.config.PRINCIPAL) +private val maybeKeytab = conf.get(org.apache.spark.internal.config.KEYTAB) +private val maybeExistingSecretName = conf.get(KUBERNETES_KERBEROS_DT_SECRET_NAME) +private val maybeExistingSecretItemKey = + conf.get(KUBERNETES_KERBEROS_DT_SECRET_ITEM_KEY) +private val maybeKrb5File = + conf.get(KUBERNETES_KERBEROS_KRB5_FILE) +private val maybeKrb5CMap = + conf.get(KUBERNETES_KERBEROS_KRB5_CONFIG_MAP) +private val kubeTokenManager = kubernetesConf.tokenManager(conf, + SparkHadoopUtil.get.newConfiguration(conf)) +private val isKerberosEnabled = kubeTokenManager.isSecurityEnabled + +require(maybeKeytab.isEmpty || isKerberosEnabled, + "You must enable Kerberos support if you are specifying a Kerberos Keytab") + +require(maybeExistingSecretName.isEmpty || isKerberosEnabled, + "You must enable Kerberos support if you are specifying a Kerberos Secret") + + require((maybeKrb5File.isEmpty || maybeKrb5CMap.isEmpty) || isKerberosEnabled, +"You must specify either a krb5 file location or a ConfigMap with a krb5 file") + + KubernetesUtils.requireNandDefined( + maybeKrb5File, + maybeKrb5CMap, + "Do not specify both a Krb5 local file and the ConfigMap as the creation " + + "of an additional ConfigMap, when one is already specified, is extraneous") + +KubernetesUtils.requireBothOrNeitherDefined( + maybeKeytab, + maybePrincipal, + "If a Kerberos principal is specified you must also specify a Kerberos keytab", + "If a Kerberos keytab is specified you must also specify a Kerberos principal") + +KubernetesUtils.requireBothOrNeitherDefined( + maybeExistingSecretName, + maybeExistingSecretItemKey, + "If a secret data item-key where the data of the Kerberos Delegation Token is specified" + +" you must also specify the name of the secret", + "If a secret storing a Kerberos Delegation Token is specified you must also" + +" specify the item-key where the data is stored") + +require(kubernetesConf.hadoopConfDir.isDefined, "Ensure that HADOOP_CONF_DIR is defined") +private val hadoopConfDir = kubernetesConf.hadoopConfDir.get +private val hadoopConfigurationFiles