Github user liyinan926 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20910#discussion_r179242084
  
    --- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
 ---
    @@ -14,104 +14,66 @@
      * See the License for the specific language governing permissions and
      * limitations under the License.
      */
    -package org.apache.spark.scheduler.cluster.k8s
    +package org.apache.spark.deploy.k8s.features
     
     import scala.collection.JavaConverters._
     
    -import io.fabric8.kubernetes.api.model._
    +import io.fabric8.kubernetes.api.model.{ContainerBuilder, 
ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata, 
PodBuilder, QuantityBuilder}
     
    -import org.apache.spark.{SparkConf, SparkException}
    -import org.apache.spark.deploy.k8s.{KubernetesUtils, MountSecretsBootstrap}
    +import org.apache.spark.SparkException
    +import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesExecutorSpecificConf, SparkPod}
     import org.apache.spark.deploy.k8s.Config._
     import org.apache.spark.deploy.k8s.Constants._
     import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, 
EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD}
    +import org.apache.spark.rpc.RpcEndpointAddress
    +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
     import org.apache.spark.util.Utils
     
    -/**
    - * A factory class for bootstrapping and creating executor pods with the 
given bootstrapping
    - * components.
    - *
    - * @param sparkConf Spark configuration
    - * @param mountSecretsBootstrap an optional component for mounting 
user-specified secrets onto
    - *                              user-specified paths into the executor 
container
    - */
    -private[spark] class ExecutorPodFactory(
    -    sparkConf: SparkConf,
    -    mountSecretsBootstrap: Option[MountSecretsBootstrap]) {
    -
    -  private val executorExtraClasspath = sparkConf.get(EXECUTOR_CLASS_PATH)
    -
    -  private val executorLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
    -    sparkConf,
    -    KUBERNETES_EXECUTOR_LABEL_PREFIX)
    -  require(
    -    !executorLabels.contains(SPARK_APP_ID_LABEL),
    -    s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is 
reserved for Spark.")
    -  require(
    -    !executorLabels.contains(SPARK_EXECUTOR_ID_LABEL),
    -    s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it 
is reserved for" +
    -      " Spark.")
    -  require(
    -    !executorLabels.contains(SPARK_ROLE_LABEL),
    -    s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is 
reserved for Spark.")
    +private[spark] class BasicExecutorFeatureStep(
    +    kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf])
    +  extends KubernetesFeatureConfigStep {
     
    -  private val executorAnnotations =
    -    KubernetesUtils.parsePrefixedKeyValuePairs(
    -      sparkConf,
    -      KUBERNETES_EXECUTOR_ANNOTATION_PREFIX)
    -  private val nodeSelector =
    -    KubernetesUtils.parsePrefixedKeyValuePairs(
    -      sparkConf,
    -      KUBERNETES_NODE_SELECTOR_PREFIX)
    -
    -  private val executorContainerImage = sparkConf
    +  // Consider moving some of these fields to KubernetesConf or 
KubernetesExecutorSpecificConf
    +  private val executorExtraClasspath = 
kubernetesConf.get(EXECUTOR_CLASS_PATH)
    +  private val executorContainerImage = kubernetesConf
         .get(EXECUTOR_CONTAINER_IMAGE)
         .getOrElse(throw new SparkException("Must specify the executor 
container image"))
    -  private val imagePullPolicy = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
    -  private val blockManagerPort = sparkConf
    +  private val blockManagerPort = kubernetesConf
    +    .sparkConf
         .getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
     
    -  private val executorPodNamePrefix = 
sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
    +  private val executorPodNamePrefix = kubernetesConf.appResourceNamePrefix
     
    -  private val executorMemoryMiB = sparkConf.get(EXECUTOR_MEMORY)
    -  private val executorMemoryString = sparkConf.get(
    +  private val driverUrl = RpcEndpointAddress(
    +    kubernetesConf.sparkConf.get("spark.driver.host"),
    +    kubernetesConf.sparkConf.getInt("spark.driver.port", 
DEFAULT_DRIVER_PORT),
    +    CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
    +  private val executorMemoryMiB = kubernetesConf.get(EXECUTOR_MEMORY)
    +  private val executorMemoryString = kubernetesConf.get(
         EXECUTOR_MEMORY.key, EXECUTOR_MEMORY.defaultValueString)
     
    -  private val memoryOverheadMiB = sparkConf
    +  private val memoryOverheadMiB = kubernetesConf
         .get(EXECUTOR_MEMORY_OVERHEAD)
         .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt,
           MEMORY_OVERHEAD_MIN_MIB))
       private val executorMemoryWithOverhead = executorMemoryMiB + 
memoryOverheadMiB
     
    -  private val executorCores = sparkConf.getInt("spark.executor.cores", 1)
    -  private val executorCoresRequest = if 
(sparkConf.contains(KUBERNETES_EXECUTOR_REQUEST_CORES)) {
    -    sparkConf.get(KUBERNETES_EXECUTOR_REQUEST_CORES).get
    -  } else {
    -    executorCores.toString
    -  }
    -  private val executorLimitCores = 
sparkConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES)
    +  private val executorCores = 
kubernetesConf.sparkConf.getInt("spark.executor.cores", 1)
    +  private val executorCoresRequest =
    +    if 
(kubernetesConf.sparkConf.contains(KUBERNETES_EXECUTOR_REQUEST_CORES)) {
    +      kubernetesConf.sparkConf.get(KUBERNETES_EXECUTOR_REQUEST_CORES).get
    +    } else {
    +      executorCores.toString
    +    }
    +  private val executorLimitCores = 
kubernetesConf.sparkConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES)
    --- End diff --
    
    Ditto.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to