Github user liyinan926 commented on a diff in the pull request:
https://github.com/apache/spark/pull/20910#discussion_r179313348
--- Diff:
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder,
ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata,
PodBuilder, QuantityBuilder}
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.{KubernetesConf,
KubernetesExecutorSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH,
EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD}
+import org.apache.spark.rpc.RpcEndpointAddress
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.util.Utils
+
+private[spark] class BasicExecutorFeatureStep(
+ kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf])
+ extends KubernetesFeatureConfigStep {
+
+ // Consider moving some of these fields to KubernetesConf or
KubernetesExecutorSpecificConf
+ private val executorExtraClasspath =
kubernetesConf.get(EXECUTOR_CLASS_PATH)
+ private val executorContainerImage = kubernetesConf
+ .get(EXECUTOR_CONTAINER_IMAGE)
+ .getOrElse(throw new SparkException("Must specify the executor
container image"))
+ private val blockManagerPort = kubernetesConf
+ .sparkConf
+ .getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
+
+ private val executorPodNamePrefix = kubernetesConf.appResourceNamePrefix
+
+ private val driverUrl = RpcEndpointAddress(
+ kubernetesConf.get("spark.driver.host"),
+ kubernetesConf.sparkConf.getInt("spark.driver.port",
DEFAULT_DRIVER_PORT),
+ CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+ private val executorMemoryMiB = kubernetesConf.get(EXECUTOR_MEMORY)
+ private val executorMemoryString = kubernetesConf.get(
+ EXECUTOR_MEMORY.key, EXECUTOR_MEMORY.defaultValueString)
+
+ private val memoryOverheadMiB = kubernetesConf
+ .get(EXECUTOR_MEMORY_OVERHEAD)
+ .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt,
+ MEMORY_OVERHEAD_MIN_MIB))
+ private val executorMemoryWithOverhead = executorMemoryMiB +
memoryOverheadMiB
+
+ private val executorCores =
kubernetesConf.sparkConf.getInt("spark.executor.cores", 1)
+ private val executorCoresRequest =
+ if
(kubernetesConf.sparkConf.contains(KUBERNETES_EXECUTOR_REQUEST_CORES)) {
+ kubernetesConf.get(KUBERNETES_EXECUTOR_REQUEST_CORES).get
+ } else {
+ executorCores.toString
+ }
+ private val executorLimitCores =
kubernetesConf.sparkConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES)
--- End diff --
Looks like this can also be simplified as `kubernetesConf .get`.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]