Github user liyinan926 commented on a diff in the pull request:
https://github.com/apache/spark/pull/19717#discussion_r154168292
--- Diff:
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala
---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder,
EnvVarSourceBuilder, PodBuilder, QuantityBuilder}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * Represents the initial setup required for the driver.
+ */
+private[spark] class BaseDriverConfigurationStep(
+ kubernetesAppId: String,
+ kubernetesResourceNamePrefix: String,
+ driverLabels: Map[String, String],
+ dockerImagePullPolicy: String,
+ appName: String,
+ mainClass: String,
+ appArgs: Array[String],
+ submissionSparkConf: SparkConf) extends DriverConfigurationStep {
+
+ private val kubernetesDriverPodName =
submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME)
+ .getOrElse(s"$kubernetesResourceNamePrefix-driver")
+
+ private val driverExtraClasspath = submissionSparkConf.get(
+ org.apache.spark.internal.config.DRIVER_CLASS_PATH)
+
+ private val driverDockerImage =
submissionSparkConf.get(DRIVER_DOCKER_IMAGE)
+
+ // CPU settings
+ private val driverCpuCores =
submissionSparkConf.getOption("spark.driver.cores").getOrElse("1")
+ private val driverLimitCores =
submissionSparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES)
+
+ // Memory settings
+ private val driverMemoryMiB = submissionSparkConf.get(
+ org.apache.spark.internal.config.DRIVER_MEMORY)
+ private val driverMemoryString = submissionSparkConf.get(
+ org.apache.spark.internal.config.DRIVER_MEMORY.key,
+ org.apache.spark.internal.config.DRIVER_MEMORY.defaultValueString)
+ private val memoryOverheadMiB = submissionSparkConf
+ .get(KUBERNETES_DRIVER_MEMORY_OVERHEAD)
+ .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt,
+ MEMORY_OVERHEAD_MIN_MIB))
+ private val driverContainerMemoryWithOverheadMiB = driverMemoryMiB +
memoryOverheadMiB
+
+ override def configureDriver(
+ driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
+ val driverExtraClasspathEnv = driverExtraClasspath.map { classPath =>
+ new EnvVarBuilder()
+ .withName(ENV_SUBMIT_EXTRA_CLASSPATH)
+ .withValue(classPath)
+ .build()
+ }
+
+ val driverCustomAnnotations = ConfigurationUtils
+ .parsePrefixedKeyValuePairs(
+ submissionSparkConf,
+ KUBERNETES_DRIVER_ANNOTATION_PREFIX)
+ require(!driverCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION),
+ s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as
it is reserved for" +
+ " Spark bookkeeping operations.")
+
+ val driverCustomEnvs =
submissionSparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq
+ .map(env => new EnvVarBuilder()
+ .withName(env._1)
+ .withValue(env._2)
+ .build())
+
+ val allDriverAnnotations = driverCustomAnnotations ++
Map(SPARK_APP_NAME_ANNOTATION -> appName)
+
+ val nodeSelector = ConfigurationUtils.parsePrefixedKeyValuePairs(
+ submissionSparkConf, KUBERNETES_NODE_SELECTOR_PREFIX)
+
+ val driverCpuQuantity = new QuantityBuilder(false)
+ .withAmount(driverCpuCores)
+ .build()
+ val driverMemoryQuantity = new QuantityBuilder(false)
+ .withAmount(s"${driverMemoryMiB}Mi")
+ .build()
+ val driverMemoryLimitQuantity = new QuantityBuilder(false)
+ .withAmount(s"${driverContainerMemoryWithOverheadMiB}Mi")
+ .build()
+ val maybeCpuLimitQuantity = driverLimitCores.map { limitCores =>
+ ("cpu", new QuantityBuilder(false).withAmount(limitCores).build())
+ }
+
+ val driverContainer = new ContainerBuilder(driverSpec.driverContainer)
+ .withName(DRIVER_CONTAINER_NAME)
+ .withImage(driverDockerImage)
+ .withImagePullPolicy(dockerImagePullPolicy)
+ .addAllToEnv(driverCustomEnvs.asJava)
+ .addToEnv(driverExtraClasspathEnv.toSeq: _*)
+ .addNewEnv()
+ .withName(ENV_DRIVER_MEMORY)
+ .withValue(driverMemoryString)
+ .endEnv()
+ .addNewEnv()
+ .withName(ENV_DRIVER_MAIN_CLASS)
+ .withValue(mainClass)
+ .endEnv()
+ .addNewEnv()
+ .withName(ENV_DRIVER_ARGS)
+ .withValue(appArgs.mkString(" "))
--- End diff --
Good catch. Added `.map(arg => s"\\\"$arg\\\"")` before `mkString`.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]