[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-04-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20910


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-04-11 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r180784491
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesSpec.scala
 ---
@@ -14,17 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.deploy.k8s.submit.steps
+package org.apache.spark.deploy.k8s
 
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+import io.fabric8.kubernetes.api.model.HasMetadata
 
-/**
- * Represents a step in configuring the Spark driver pod.
- */
-private[spark] trait DriverConfigurationStep {
+private[spark] case class KubernetesSpec(
--- End diff --

Think we could move some of the logic from `KubernetesClientApplication` 
into `KubernetesDriverBuilder`. Do you have any suggestions if we should move 
around the abstraction boundaries a bit?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-04-11 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r180782338
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesSpec.scala
 ---
@@ -14,17 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.deploy.k8s.submit.steps
+package org.apache.spark.deploy.k8s
 
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+import io.fabric8.kubernetes.api.model.HasMetadata
 
-/**
- * Represents a step in configuring the Spark driver pod.
- */
-private[spark] trait DriverConfigurationStep {
+private[spark] case class KubernetesSpec(
--- End diff --

If you check `KubernetesClientApplication`, it needs the extra fields here 
(the additional driver resources and the driver system properties) to construct 
the pod. In other words, the driver builder has to return a structure with more 
than just the pod.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-04-10 Thread foxish
Github user foxish commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r180535990
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesSpec.scala
 ---
@@ -14,17 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.deploy.k8s.submit.steps
+package org.apache.spark.deploy.k8s
 
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+import io.fabric8.kubernetes.api.model.HasMetadata
 
-/**
- * Represents a step in configuring the Spark driver pod.
- */
-private[spark] trait DriverConfigurationStep {
+private[spark] case class KubernetesSpec(
--- End diff --

This class is named as though it applies to driver and executor 
construction. Maybe `KubernetesDriverSpec`? It's also a bit unclear to me what 
purpose this abstraction serves as opposed to the way 
`KubernetesExecutorBuilder` goes about building the pod.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-04-04 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r179313348
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, 
ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata, 
PodBuilder, QuantityBuilder}
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesExecutorSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, 
EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD}
+import org.apache.spark.rpc.RpcEndpointAddress
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.util.Utils
+
+private[spark] class BasicExecutorFeatureStep(
+kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf])
+  extends KubernetesFeatureConfigStep {
+
+  // Consider moving some of these fields to KubernetesConf or 
KubernetesExecutorSpecificConf
+  private val executorExtraClasspath = 
kubernetesConf.get(EXECUTOR_CLASS_PATH)
+  private val executorContainerImage = kubernetesConf
+.get(EXECUTOR_CONTAINER_IMAGE)
+.getOrElse(throw new SparkException("Must specify the executor 
container image"))
+  private val blockManagerPort = kubernetesConf
+.sparkConf
+.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
+
+  private val executorPodNamePrefix = kubernetesConf.appResourceNamePrefix
+
+  private val driverUrl = RpcEndpointAddress(
+kubernetesConf.get("spark.driver.host"),
+kubernetesConf.sparkConf.getInt("spark.driver.port", 
DEFAULT_DRIVER_PORT),
+CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+  private val executorMemoryMiB = kubernetesConf.get(EXECUTOR_MEMORY)
+  private val executorMemoryString = kubernetesConf.get(
+EXECUTOR_MEMORY.key, EXECUTOR_MEMORY.defaultValueString)
+
+  private val memoryOverheadMiB = kubernetesConf
+.get(EXECUTOR_MEMORY_OVERHEAD)
+.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt,
+  MEMORY_OVERHEAD_MIN_MIB))
+  private val executorMemoryWithOverhead = executorMemoryMiB + 
memoryOverheadMiB
+
+  private val executorCores = 
kubernetesConf.sparkConf.getInt("spark.executor.cores", 1)
+  private val executorCoresRequest =
+if 
(kubernetesConf.sparkConf.contains(KUBERNETES_EXECUTOR_REQUEST_CORES)) {
+  kubernetesConf.get(KUBERNETES_EXECUTOR_REQUEST_CORES).get
+} else {
+  executorCores.toString
+}
+  private val executorLimitCores = 
kubernetesConf.sparkConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES)
--- End diff --

Looks like this can also be simplified as `kubernetesConf .get`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-04-04 Thread kimoonkim
Github user kimoonkim commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r179302211
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
 ---
@@ -175,49 +138,41 @@ private[spark] class ExecutorPodFactory(
   .withPorts(requiredPorts.asJava)
   .addToArgs("executor")
   .build()
-
-val executorPod = new PodBuilder()
-  .withNewMetadata()
-.withName(name)
-.withLabels(resolvedExecutorLabels.asJava)
-.withAnnotations(executorAnnotations.asJava)
-.withOwnerReferences()
-  .addNewOwnerReference()
-.withController(true)
-.withApiVersion(driverPod.getApiVersion)
-.withKind(driverPod.getKind)
-.withName(driverPod.getMetadata.getName)
-.withUid(driverPod.getMetadata.getUid)
-.endOwnerReference()
-.endMetadata()
-  .withNewSpec()
-.withHostname(hostname)
-.withRestartPolicy("Never")
-.withNodeSelector(nodeSelector.asJava)
-.endSpec()
-  .build()
-
 val containerWithLimitCores = executorLimitCores.map { limitCores =>
   val executorCpuLimitQuantity = new QuantityBuilder(false)
 .withAmount(limitCores)
 .build()
   new ContainerBuilder(executorContainer)
 .editResources()
-.addToLimits("cpu", executorCpuLimitQuantity)
-.endResources()
+  .addToLimits("cpu", executorCpuLimitQuantity)
--- End diff --

I see it now. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-04-04 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r179297367
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
 ---
@@ -175,49 +138,41 @@ private[spark] class ExecutorPodFactory(
   .withPorts(requiredPorts.asJava)
   .addToArgs("executor")
   .build()
-
-val executorPod = new PodBuilder()
-  .withNewMetadata()
-.withName(name)
-.withLabels(resolvedExecutorLabels.asJava)
-.withAnnotations(executorAnnotations.asJava)
-.withOwnerReferences()
-  .addNewOwnerReference()
-.withController(true)
-.withApiVersion(driverPod.getApiVersion)
-.withKind(driverPod.getKind)
-.withName(driverPod.getMetadata.getName)
-.withUid(driverPod.getMetadata.getUid)
-.endOwnerReference()
-.endMetadata()
-  .withNewSpec()
-.withHostname(hostname)
-.withRestartPolicy("Never")
-.withNodeSelector(nodeSelector.asJava)
-.endSpec()
-  .build()
-
 val containerWithLimitCores = executorLimitCores.map { limitCores =>
   val executorCpuLimitQuantity = new QuantityBuilder(false)
 .withAmount(limitCores)
 .build()
   new ContainerBuilder(executorContainer)
 .editResources()
-.addToLimits("cpu", executorCpuLimitQuantity)
-.endResources()
+  .addToLimits("cpu", executorCpuLimitQuantity)
--- End diff --

We do, it's set a few lines up.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-04-04 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r179297415
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
 ---
@@ -175,49 +138,41 @@ private[spark] class ExecutorPodFactory(
   .withPorts(requiredPorts.asJava)
   .addToArgs("executor")
   .build()
-
-val executorPod = new PodBuilder()
-  .withNewMetadata()
-.withName(name)
-.withLabels(resolvedExecutorLabels.asJava)
-.withAnnotations(executorAnnotations.asJava)
-.withOwnerReferences()
-  .addNewOwnerReference()
-.withController(true)
-.withApiVersion(driverPod.getApiVersion)
-.withKind(driverPod.getKind)
-.withName(driverPod.getMetadata.getName)
-.withUid(driverPod.getMetadata.getUid)
-.endOwnerReference()
-.endMetadata()
-  .withNewSpec()
-.withHostname(hostname)
-.withRestartPolicy("Never")
-.withNodeSelector(nodeSelector.asJava)
-.endSpec()
-  .build()
-
 val containerWithLimitCores = executorLimitCores.map { limitCores =>
   val executorCpuLimitQuantity = new QuantityBuilder(false)
 .withAmount(limitCores)
 .build()
   new ContainerBuilder(executorContainer)
 .editResources()
-.addToLimits("cpu", executorCpuLimitQuantity)
-.endResources()
+  .addToLimits("cpu", executorCpuLimitQuantity)
--- End diff --


https://github.com/apache/spark/pull/20910/files/7d65875266ba94f27d2cc8ec992e8e1cb8f593b5#diff-309414bee6f4741e43bd01e9a8fdR135


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-04-04 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r179296929
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
 ---
@@ -14,104 +14,66 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.scheduler.cluster.k8s
+package org.apache.spark.deploy.k8s.features
 
 import scala.collection.JavaConverters._
 
-import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, 
ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata, 
PodBuilder, QuantityBuilder}
 
-import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.deploy.k8s.{KubernetesUtils, MountSecretsBootstrap}
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesExecutorSpecificConf, SparkPod}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, 
EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD}
+import org.apache.spark.rpc.RpcEndpointAddress
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.util.Utils
 
-/**
- * A factory class for bootstrapping and creating executor pods with the 
given bootstrapping
- * components.
- *
- * @param sparkConf Spark configuration
- * @param mountSecretsBootstrap an optional component for mounting 
user-specified secrets onto
- *  user-specified paths into the executor 
container
- */
-private[spark] class ExecutorPodFactory(
-sparkConf: SparkConf,
-mountSecretsBootstrap: Option[MountSecretsBootstrap]) {
-
-  private val executorExtraClasspath = sparkConf.get(EXECUTOR_CLASS_PATH)
-
-  private val executorLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
-sparkConf,
-KUBERNETES_EXECUTOR_LABEL_PREFIX)
-  require(
-!executorLabels.contains(SPARK_APP_ID_LABEL),
-s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is 
reserved for Spark.")
-  require(
-!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL),
-s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it 
is reserved for" +
-  " Spark.")
-  require(
-!executorLabels.contains(SPARK_ROLE_LABEL),
-s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is 
reserved for Spark.")
+private[spark] class BasicExecutorFeatureStep(
+kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf])
+  extends KubernetesFeatureConfigStep {
 
-  private val executorAnnotations =
-KubernetesUtils.parsePrefixedKeyValuePairs(
-  sparkConf,
-  KUBERNETES_EXECUTOR_ANNOTATION_PREFIX)
-  private val nodeSelector =
-KubernetesUtils.parsePrefixedKeyValuePairs(
-  sparkConf,
-  KUBERNETES_NODE_SELECTOR_PREFIX)
-
-  private val executorContainerImage = sparkConf
+  // Consider moving some of these fields to KubernetesConf or 
KubernetesExecutorSpecificConf
+  private val executorExtraClasspath = 
kubernetesConf.get(EXECUTOR_CLASS_PATH)
+  private val executorContainerImage = kubernetesConf
 .get(EXECUTOR_CONTAINER_IMAGE)
 .getOrElse(throw new SparkException("Must specify the executor 
container image"))
-  private val imagePullPolicy = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
-  private val blockManagerPort = sparkConf
+  private val blockManagerPort = kubernetesConf
+.sparkConf
--- End diff --

Constant for this one in particular is tricky. The docs say for all cluster 
managers that if the block manager port is not provided, it is set to a random 
number. But in Kubernetes because we want to know port numbers since we put 
them in the pod spec, we always set the port to a fixed number if it's not 
specifically provided. And again, as before, for configurations that are native 
to spark-core but haven't been given their own config entry, think it's fine to 
just use the raw string value.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-04-04 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r179296427
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, 
EnvVarSourceBuilder, HasMetadata, PodBuilder, QuantityBuilder}
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesDriverSpecificConf, KubernetesUtils, SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.internal.config._
+import org.apache.spark.launcher.SparkLauncher
+
+private[spark] class BasicDriverFeatureStep(
+conf: KubernetesConf[KubernetesDriverSpecificConf])
+  extends KubernetesFeatureConfigStep {
+
+  private val driverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(s"${conf.appResourceNamePrefix}-driver")
+
+  private val driverContainerImage = conf
+.get(DRIVER_CONTAINER_IMAGE)
+.getOrElse(throw new SparkException("Must specify the driver container 
image"))
+
+  // CPU settings
+  private val driverCpuCores = 
conf.getOption("spark.driver.cores").getOrElse("1")
--- End diff --

For all of the properties that don't already have a `ConfigEntry` in 
`package.scala` in spark-core, I'm leaving them as raw strings. If you look for 
usages of `spark.driver.cores`, for example, the raw string is used everywhere.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-04-04 Thread kimoonkim
Github user kimoonkim commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r179265065
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
 ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import io.fabric8.kubernetes.api.model.Pod
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, 
MainAppResource}
+import org.apache.spark.internal.config.ConfigEntry
+
+private[spark] sealed trait KubernetesRoleSpecificConf
+
+private[spark] case class KubernetesDriverSpecificConf(
+mainAppResource: Option[MainAppResource],
+mainClass: String,
+appName: String,
+appArgs: Seq[String]) extends KubernetesRoleSpecificConf
+
+private[spark] case class KubernetesExecutorSpecificConf(
+executorId: String,
+driverPod: Pod)
+  extends KubernetesRoleSpecificConf
+
+private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
--- End diff --

I like this unified class. Maybe add a one-liner comment on what this 
represents?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-04-04 Thread kimoonkim
Github user kimoonkim commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r179267979
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, 
EnvVarSourceBuilder, HasMetadata, PodBuilder, QuantityBuilder}
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesDriverSpecificConf, KubernetesUtils, SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.internal.config._
+import org.apache.spark.launcher.SparkLauncher
+
+private[spark] class BasicDriverFeatureStep(
+conf: KubernetesConf[KubernetesDriverSpecificConf])
+  extends KubernetesFeatureConfigStep {
+
+  private val driverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(s"${conf.appResourceNamePrefix}-driver")
+
+  private val driverContainerImage = conf
+.get(DRIVER_CONTAINER_IMAGE)
+.getOrElse(throw new SparkException("Must specify the driver container 
image"))
+
+  // CPU settings
+  private val driverCpuCores = 
conf.getOption("spark.driver.cores").getOrElse("1")
+  private val driverLimitCores = conf.get(KUBERNETES_DRIVER_LIMIT_CORES)
+
+  // Memory settings
+  private val driverMemoryMiB = conf.get(DRIVER_MEMORY)
+  private val memoryOverheadMiB = conf
+.get(DRIVER_MEMORY_OVERHEAD)
+.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt, 
MEMORY_OVERHEAD_MIN_MIB))
+  private val driverMemoryWithOverheadMiB = driverMemoryMiB + 
memoryOverheadMiB
+
+  override def configurePod(pod: SparkPod): SparkPod = {
+val driverCustomEnvs = conf.roleEnvs
+  .toSeq
+  .map { env =>
+new EnvVarBuilder()
+  .withName(env._1)
+  .withValue(env._2)
+  .build()
+  }
+
+val driverCpuQuantity = new QuantityBuilder(false)
+  .withAmount(driverCpuCores)
+  .build()
+val driverMemoryQuantity = new QuantityBuilder(false)
+  .withAmount(s"${driverMemoryWithOverheadMiB}Mi")
+  .build()
+val maybeCpuLimitQuantity = driverLimitCores.map { limitCores =>
+  ("cpu", new QuantityBuilder(false).withAmount(limitCores).build())
+}
+
+val driverContainer = new ContainerBuilder(pod.container)
+  .withName(DRIVER_CONTAINER_NAME)
+  .withImage(driverContainerImage)
+  .withImagePullPolicy(conf.imagePullPolicy())
+  .addAllToEnv(driverCustomEnvs.asJava)
+  .addNewEnv()
+.withName(ENV_DRIVER_BIND_ADDRESS)
+.withValueFrom(new EnvVarSourceBuilder()
+  .withNewFieldRef("v1", "status.podIP")
+  .build())
+.endEnv()
+  .withNewResources()
+.addToRequests("cpu", driverCpuQuantity)
+.addToRequests("memory", driverMemoryQuantity)
+.addToLimits("memory", driverMemoryQuantity)
+.addToLimits(maybeCpuLimitQuantity.toMap.asJava)
--- End diff --

Ordering of line 86 and 89 are a bit confusing. Maybe move to right next to 
cpu requests at line 86? Or swap with line 88 so we have cpu and memory 
requests followed by cpu and memory limits in the same order?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-04-04 Thread kimoonkim
Github user kimoonkim commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r179266001
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala
 ---
@@ -14,17 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.deploy.k8s.submit.steps
+package org.apache.spark.deploy.k8s
 
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, 
PodBuilder}
 
-/**
- * Represents a step in configuring the Spark driver pod.
- */
-private[spark] trait DriverConfigurationStep {
+private[spark] case class SparkPod(pod: Pod, container: Container)
 
-  /**
-   * Apply some transformation to the previous state of the driver to add 
a new feature to it.
-   */
-  def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec
+private[spark] object SparkPod {
+  def initialPod(): SparkPod = {
+SparkPod(
+  new 
PodBuilder().withNewMetadata().endMetadata().withNewSpec().endSpec().build(),
--- End diff --

I am not sure about Spark code style. But shouldn't we put one call per 
line for builders? i.e.
```
new PodBuilder()
  .withNewMetadata()
  .endMetadata()
  .withNewSpec()
  .endSpec()
  .build()
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-04-04 Thread kimoonkim
Github user kimoonkim commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r179273021
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala
 ---
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import io.fabric8.kubernetes.api.model.HasMetadata
+
+import org.apache.spark.deploy.k8s.SparkPod
+
+private[spark] trait KubernetesFeatureConfigStep {
--- End diff --

Since this is a common trait that will be used by multiple future step 
implementations, maybe we want to add some class and method level comments 
explaining what they are supposed to do?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-04-04 Thread kimoonkim
Github user kimoonkim commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r179269454
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
 ---
@@ -14,104 +14,66 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.scheduler.cluster.k8s
+package org.apache.spark.deploy.k8s.features
 
 import scala.collection.JavaConverters._
 
-import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, 
ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata, 
PodBuilder, QuantityBuilder}
 
-import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.deploy.k8s.{KubernetesUtils, MountSecretsBootstrap}
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesExecutorSpecificConf, SparkPod}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, 
EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD}
+import org.apache.spark.rpc.RpcEndpointAddress
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.util.Utils
 
-/**
- * A factory class for bootstrapping and creating executor pods with the 
given bootstrapping
- * components.
- *
- * @param sparkConf Spark configuration
- * @param mountSecretsBootstrap an optional component for mounting 
user-specified secrets onto
- *  user-specified paths into the executor 
container
- */
-private[spark] class ExecutorPodFactory(
-sparkConf: SparkConf,
-mountSecretsBootstrap: Option[MountSecretsBootstrap]) {
-
-  private val executorExtraClasspath = sparkConf.get(EXECUTOR_CLASS_PATH)
-
-  private val executorLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
-sparkConf,
-KUBERNETES_EXECUTOR_LABEL_PREFIX)
-  require(
-!executorLabels.contains(SPARK_APP_ID_LABEL),
-s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is 
reserved for Spark.")
-  require(
-!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL),
-s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it 
is reserved for" +
-  " Spark.")
-  require(
-!executorLabels.contains(SPARK_ROLE_LABEL),
-s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is 
reserved for Spark.")
+private[spark] class BasicExecutorFeatureStep(
+kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf])
+  extends KubernetesFeatureConfigStep {
 
-  private val executorAnnotations =
-KubernetesUtils.parsePrefixedKeyValuePairs(
-  sparkConf,
-  KUBERNETES_EXECUTOR_ANNOTATION_PREFIX)
-  private val nodeSelector =
-KubernetesUtils.parsePrefixedKeyValuePairs(
-  sparkConf,
-  KUBERNETES_NODE_SELECTOR_PREFIX)
-
-  private val executorContainerImage = sparkConf
+  // Consider moving some of these fields to KubernetesConf or 
KubernetesExecutorSpecificConf
+  private val executorExtraClasspath = 
kubernetesConf.get(EXECUTOR_CLASS_PATH)
+  private val executorContainerImage = kubernetesConf
 .get(EXECUTOR_CONTAINER_IMAGE)
 .getOrElse(throw new SparkException("Must specify the executor 
container image"))
-  private val imagePullPolicy = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
-  private val blockManagerPort = sparkConf
+  private val blockManagerPort = kubernetesConf
+.sparkConf
--- End diff --

Same suggestion about using Scala constants if possible.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-04-04 Thread kimoonkim
Github user kimoonkim commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r179270994
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
 ---
@@ -175,49 +138,41 @@ private[spark] class ExecutorPodFactory(
   .withPorts(requiredPorts.asJava)
   .addToArgs("executor")
   .build()
-
-val executorPod = new PodBuilder()
-  .withNewMetadata()
-.withName(name)
-.withLabels(resolvedExecutorLabels.asJava)
-.withAnnotations(executorAnnotations.asJava)
-.withOwnerReferences()
-  .addNewOwnerReference()
-.withController(true)
-.withApiVersion(driverPod.getApiVersion)
-.withKind(driverPod.getKind)
-.withName(driverPod.getMetadata.getName)
-.withUid(driverPod.getMetadata.getUid)
-.endOwnerReference()
-.endMetadata()
-  .withNewSpec()
-.withHostname(hostname)
-.withRestartPolicy("Never")
-.withNodeSelector(nodeSelector.asJava)
-.endSpec()
-  .build()
-
 val containerWithLimitCores = executorLimitCores.map { limitCores =>
   val executorCpuLimitQuantity = new QuantityBuilder(false)
 .withAmount(limitCores)
 .build()
   new ContainerBuilder(executorContainer)
 .editResources()
-.addToLimits("cpu", executorCpuLimitQuantity)
-.endResources()
+  .addToLimits("cpu", executorCpuLimitQuantity)
--- End diff --

Just curious. Do we not support executor CPU request?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-04-04 Thread kimoonkim
Github user kimoonkim commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r179266562
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, 
EnvVarSourceBuilder, HasMetadata, PodBuilder, QuantityBuilder}
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesDriverSpecificConf, KubernetesUtils, SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.internal.config._
+import org.apache.spark.launcher.SparkLauncher
+
+private[spark] class BasicDriverFeatureStep(
+conf: KubernetesConf[KubernetesDriverSpecificConf])
+  extends KubernetesFeatureConfigStep {
+
+  private val driverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(s"${conf.appResourceNamePrefix}-driver")
+
+  private val driverContainerImage = conf
+.get(DRIVER_CONTAINER_IMAGE)
+.getOrElse(throw new SparkException("Must specify the driver container 
image"))
+
+  // CPU settings
+  private val driverCpuCores = 
conf.getOption("spark.driver.cores").getOrElse("1")
--- End diff --

Maybe use a Scala constant for `spark.driver.cores` if it makes sense? I 
see other code lines use constants.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-04-04 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r179242084
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
 ---
@@ -14,104 +14,66 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.scheduler.cluster.k8s
+package org.apache.spark.deploy.k8s.features
 
 import scala.collection.JavaConverters._
 
-import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, 
ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata, 
PodBuilder, QuantityBuilder}
 
-import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.deploy.k8s.{KubernetesUtils, MountSecretsBootstrap}
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesExecutorSpecificConf, SparkPod}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, 
EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD}
+import org.apache.spark.rpc.RpcEndpointAddress
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.util.Utils
 
-/**
- * A factory class for bootstrapping and creating executor pods with the 
given bootstrapping
- * components.
- *
- * @param sparkConf Spark configuration
- * @param mountSecretsBootstrap an optional component for mounting 
user-specified secrets onto
- *  user-specified paths into the executor 
container
- */
-private[spark] class ExecutorPodFactory(
-sparkConf: SparkConf,
-mountSecretsBootstrap: Option[MountSecretsBootstrap]) {
-
-  private val executorExtraClasspath = sparkConf.get(EXECUTOR_CLASS_PATH)
-
-  private val executorLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
-sparkConf,
-KUBERNETES_EXECUTOR_LABEL_PREFIX)
-  require(
-!executorLabels.contains(SPARK_APP_ID_LABEL),
-s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is 
reserved for Spark.")
-  require(
-!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL),
-s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it 
is reserved for" +
-  " Spark.")
-  require(
-!executorLabels.contains(SPARK_ROLE_LABEL),
-s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is 
reserved for Spark.")
+private[spark] class BasicExecutorFeatureStep(
+kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf])
+  extends KubernetesFeatureConfigStep {
 
-  private val executorAnnotations =
-KubernetesUtils.parsePrefixedKeyValuePairs(
-  sparkConf,
-  KUBERNETES_EXECUTOR_ANNOTATION_PREFIX)
-  private val nodeSelector =
-KubernetesUtils.parsePrefixedKeyValuePairs(
-  sparkConf,
-  KUBERNETES_NODE_SELECTOR_PREFIX)
-
-  private val executorContainerImage = sparkConf
+  // Consider moving some of these fields to KubernetesConf or 
KubernetesExecutorSpecificConf
+  private val executorExtraClasspath = 
kubernetesConf.get(EXECUTOR_CLASS_PATH)
+  private val executorContainerImage = kubernetesConf
 .get(EXECUTOR_CONTAINER_IMAGE)
 .getOrElse(throw new SparkException("Must specify the executor 
container image"))
-  private val imagePullPolicy = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
-  private val blockManagerPort = sparkConf
+  private val blockManagerPort = kubernetesConf
+.sparkConf
 .getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
 
-  private val executorPodNamePrefix = 
sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
+  private val executorPodNamePrefix = kubernetesConf.appResourceNamePrefix
 
-  private val executorMemoryMiB = sparkConf.get(EXECUTOR_MEMORY)
-  private val executorMemoryString = sparkConf.get(
+  private val driverUrl = RpcEndpointAddress(
+kubernetesConf.sparkConf.get("spark.driver.host"),
+kubernetesConf.sparkConf.getInt("spark.driver.port", 
DEFAULT_DRIVER_PORT),
+CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+  private val executorMemoryMiB = kubernetesConf.get(EXECUTOR_MEMORY)
+  private val executorMemoryString = kubernetesConf.get(
 EXECUTOR_MEMORY.key, EXECUTOR_MEMORY.defaultValueString)
 
-  private val memoryOverheadMiB = sparkConf
+  private val memoryOverheadMiB = kubernetesConf
 .get(EXECUTOR_MEMORY_OVERHEAD)
 .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * 

[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-04-04 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r179241834
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
 ---
@@ -14,104 +14,66 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.scheduler.cluster.k8s
+package org.apache.spark.deploy.k8s.features
 
 import scala.collection.JavaConverters._
 
-import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, 
ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata, 
PodBuilder, QuantityBuilder}
 
-import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.deploy.k8s.{KubernetesUtils, MountSecretsBootstrap}
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesExecutorSpecificConf, SparkPod}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, 
EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD}
+import org.apache.spark.rpc.RpcEndpointAddress
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.util.Utils
 
-/**
- * A factory class for bootstrapping and creating executor pods with the 
given bootstrapping
- * components.
- *
- * @param sparkConf Spark configuration
- * @param mountSecretsBootstrap an optional component for mounting 
user-specified secrets onto
- *  user-specified paths into the executor 
container
- */
-private[spark] class ExecutorPodFactory(
-sparkConf: SparkConf,
-mountSecretsBootstrap: Option[MountSecretsBootstrap]) {
-
-  private val executorExtraClasspath = sparkConf.get(EXECUTOR_CLASS_PATH)
-
-  private val executorLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
-sparkConf,
-KUBERNETES_EXECUTOR_LABEL_PREFIX)
-  require(
-!executorLabels.contains(SPARK_APP_ID_LABEL),
-s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is 
reserved for Spark.")
-  require(
-!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL),
-s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it 
is reserved for" +
-  " Spark.")
-  require(
-!executorLabels.contains(SPARK_ROLE_LABEL),
-s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is 
reserved for Spark.")
+private[spark] class BasicExecutorFeatureStep(
+kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf])
+  extends KubernetesFeatureConfigStep {
 
-  private val executorAnnotations =
-KubernetesUtils.parsePrefixedKeyValuePairs(
-  sparkConf,
-  KUBERNETES_EXECUTOR_ANNOTATION_PREFIX)
-  private val nodeSelector =
-KubernetesUtils.parsePrefixedKeyValuePairs(
-  sparkConf,
-  KUBERNETES_NODE_SELECTOR_PREFIX)
-
-  private val executorContainerImage = sparkConf
+  // Consider moving some of these fields to KubernetesConf or 
KubernetesExecutorSpecificConf
+  private val executorExtraClasspath = 
kubernetesConf.get(EXECUTOR_CLASS_PATH)
+  private val executorContainerImage = kubernetesConf
 .get(EXECUTOR_CONTAINER_IMAGE)
 .getOrElse(throw new SparkException("Must specify the executor 
container image"))
-  private val imagePullPolicy = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
-  private val blockManagerPort = sparkConf
+  private val blockManagerPort = kubernetesConf
+.sparkConf
 .getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
 
-  private val executorPodNamePrefix = 
sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
+  private val executorPodNamePrefix = kubernetesConf.appResourceNamePrefix
 
-  private val executorMemoryMiB = sparkConf.get(EXECUTOR_MEMORY)
-  private val executorMemoryString = sparkConf.get(
+  private val driverUrl = RpcEndpointAddress(
+kubernetesConf.sparkConf.get("spark.driver.host"),
--- End diff --

Can be simplified to `kubernetesConf.get("spark.driver.host")`. Also the 
same in some places below.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-04-04 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r179239601
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesSpec.scala
 ---
@@ -14,25 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.deploy.k8s.submit.steps
+package org.apache.spark.deploy.k8s
 
-import org.apache.spark.deploy.k8s.MountSecretsBootstrap
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+import io.fabric8.kubernetes.api.model.HasMetadata
 
-/**
- * A driver configuration step for mounting user-specified secrets onto 
user-specified paths.
- *
- * @param bootstrap a utility actually handling mounting of the secrets.
- */
-private[spark] class DriverMountSecretsStep(
-bootstrap: MountSecretsBootstrap) extends DriverConfigurationStep {
+private[spark] case class KubernetesSpec(
+pod: SparkPod,
+additionalDriverKubernetesResources: Seq[HasMetadata],
--- End diff --

I think we can simplify the name here to `driverKubernetesResources`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-04-04 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r179239212
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
 ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import io.fabric8.kubernetes.api.model.Pod
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, 
MainAppResource}
+import org.apache.spark.internal.config.ConfigEntry
+
+private[spark] sealed trait KubernetesRoleSpecificConf
+
+private[spark] case class KubernetesDriverSpecificConf(
+mainAppResource: Option[MainAppResource],
+mainClass: String,
+appName: String,
+appArgs: Seq[String]) extends KubernetesRoleSpecificConf
+
+private[spark] case class KubernetesExecutorSpecificConf(
+executorId: String,
+driverPod: Pod)
+  extends KubernetesRoleSpecificConf
+
+private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
+sparkConf: SparkConf,
+roleSpecificConf: T,
+appResourceNamePrefix: String,
+appId: String,
+roleLabels: Map[String, String],
+roleAnnotations: Map[String, String],
+roleSecretNamesToMountPaths: Map[String, String],
+roleEnvs: Map[String, String]) {
+
+  def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)
+
+  def sparkJars(): Seq[String] = sparkConf
+.getOption("spark.jars")
+.map(str => str.split(",").toSeq)
+.getOrElse(Seq.empty[String])
+
+  def sparkFiles(): Seq[String] = sparkConf
+.getOption("spark.files")
+.map(str => str.split(",").toSeq)
+.getOrElse(Seq.empty[String])
+
+  def imagePullPolicy(): String = 
sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
+
+  def nodeSelector(): Map[String, String] =
+KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, 
KUBERNETES_NODE_SELECTOR_PREFIX)
+
+  def get[T](config: ConfigEntry[T]): T = sparkConf.get(config)
+
+  def get(conf: String, defaultValue: String): String = 
sparkConf.get(conf, defaultValue)
+
+  def getOption(key: String): Option[String] = sparkConf.getOption(key)
+
+}
+
+
+private[spark] object KubernetesConf {
+  def createDriverConf(
+  sparkConf: SparkConf,
+  appName: String,
+  appResourceNamePrefix: String,
+  appId: String,
+  mainAppResource: Option[MainAppResource],
+  mainClass: String,
+  appArgs: Array[String]): 
KubernetesConf[KubernetesDriverSpecificConf] = {
+val sparkConfWithMainAppJar = sparkConf.clone()
+mainAppResource.foreach {
+  case JavaMainAppResource(res) =>
+val previousJars = sparkConf
+  .getOption("spark.jars")
+  .map(_.split(","))
+  .getOrElse(Array.empty)
+if (!previousJars.contains(res)) {
+  sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res))
+}
+}
+
+val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
+  sparkConf,
+  KUBERNETES_DRIVER_LABEL_PREFIX)
+require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with 
key " +
+  s"$SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark 
bookkeeping " +
+  "operations.")
+require(!driverCustomLabels.contains(SPARK_ROLE_LABEL), "Label with 
key " +
+  s"$SPARK_ROLE_LABEL is not allowed as it is reserved for Spark 
bookkeeping " +
+  "operations.")
+val driverLabels = driverCustomLabels ++ Map(
+  SPARK_APP_ID_LABEL -> appId,
+  SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE)
+val driverAnnotations =
+  KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, 

[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-03-27 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r177545717
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import io.fabric8.kubernetes.api.model.Pod
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, 
MainAppResource}
+import org.apache.spark.internal.config.ConfigEntry
+
+private[spark] sealed trait KubernetesRoleSpecificConf
+
+private[spark] case class KubernetesDriverSpecificConf(
+  mainAppResource: Option[MainAppResource],
+  mainClass: String,
+  appName: String,
+  appArgs: Seq[String]) extends KubernetesRoleSpecificConf
+
+private[spark] case class KubernetesExecutorSpecificConf(
+  executorId: String, driverPod: Pod)
+  extends KubernetesRoleSpecificConf
+
+private[spark] class KubernetesConf[T <: KubernetesRoleSpecificConf](
+  val sparkConf: SparkConf,
+  val roleSpecificConf: T,
+  val appResourceNamePrefix: String,
+  val appId: String,
+  val roleLabels: Map[String, String],
+  val roleAnnotations: Map[String, String],
+  val roleSecretNamesToMountPaths: Map[String, String]) {
+
+  def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)
+
+  def sparkJars(): Seq[String] = sparkConf
+.getOption("spark.jars")
+.map(str => str.split(",").toSeq)
+.getOrElse(Seq.empty[String])
+
+  def sparkFiles(): Seq[String] = sparkConf
+.getOption("spark.files")
+.map(str => str.split(",").toSeq)
+.getOrElse(Seq.empty[String])
+
+  def driverCustomEnvs(): Seq[(String, String)] =
+sparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq
+
+  def imagePullPolicy(): String = 
sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
+
+  def nodeSelector(): Map[String, String] =
+KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, 
KUBERNETES_NODE_SELECTOR_PREFIX)
+
+  def get[T](config: ConfigEntry[T]): T = sparkConf.get(config)
+
+  def get(conf: String, defaultValue: String): String = 
sparkConf.get(conf, defaultValue)
+
+  def getOption(key: String): Option[String] = sparkConf.getOption(key)
+
--- End diff --

Ok, will address in the next patch after others review.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-03-27 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r177507771
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import io.fabric8.kubernetes.api.model.Pod
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, 
MainAppResource}
+import org.apache.spark.internal.config.ConfigEntry
+
+private[spark] sealed trait KubernetesRoleSpecificConf
+
+private[spark] case class KubernetesDriverSpecificConf(
+  mainAppResource: Option[MainAppResource],
+  mainClass: String,
+  appName: String,
+  appArgs: Seq[String]) extends KubernetesRoleSpecificConf
+
+private[spark] case class KubernetesExecutorSpecificConf(
+  executorId: String, driverPod: Pod)
+  extends KubernetesRoleSpecificConf
+
+private[spark] class KubernetesConf[T <: KubernetesRoleSpecificConf](
+  val sparkConf: SparkConf,
+  val roleSpecificConf: T,
+  val appResourceNamePrefix: String,
+  val appId: String,
+  val roleLabels: Map[String, String],
+  val roleAnnotations: Map[String, String],
+  val roleSecretNamesToMountPaths: Map[String, String],
+  val roleEnvs: Map[String, String]) {
+
+  def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)
+
+  def sparkJars(): Seq[String] = sparkConf
+.getOption("spark.jars")
+.map(str => str.split(",").toSeq)
+.getOrElse(Seq.empty[String])
+
+  def sparkFiles(): Seq[String] = sparkConf
+.getOption("spark.files")
+.map(str => str.split(",").toSeq)
+.getOrElse(Seq.empty[String])
+
+  def imagePullPolicy(): String = 
sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
+
+  def nodeSelector(): Map[String, String] =
+KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, 
KUBERNETES_NODE_SELECTOR_PREFIX)
+
+  def get[T](config: ConfigEntry[T]): T = sparkConf.get(config)
+
+  def get(conf: String, defaultValue: String): String = 
sparkConf.get(conf, defaultValue)
+
+  def getOption(key: String): Option[String] = sparkConf.getOption(key)
+
+}
+
+
+private[spark] object KubernetesConf {
+  def createDriverConf(
+sparkConf: SparkConf,
+appName: String,
+appResourceNamePrefix: String,
+appId: String,
+mainAppResource: Option[MainAppResource],
+mainClass: String,
+appArgs: Array[String]): KubernetesConf[KubernetesDriverSpecificConf] 
= {
+val sparkConfWithMainAppJar = sparkConf.clone()
+mainAppResource.foreach {
+  case JavaMainAppResource(res) =>
+val previousJars = sparkConf
+  .getOption("spark.jars")
+  .map(_.split(","))
+  .getOrElse(Array.empty)
+if (!previousJars.contains(res)) {
+  sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res))
+}
+}
+
+val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
+  sparkConf,
+  KUBERNETES_DRIVER_LABEL_PREFIX)
+require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with 
key " +
+  s"$SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark 
bookkeeping " +
+  "operations.")
+require(!driverCustomLabels.contains(SPARK_ROLE_LABEL), "Label with 
key " +
+  s"$SPARK_ROLE_LABEL is not allowed as it is reserved for Spark 
bookkeeping " +
+  "operations.")
+val driverLabels = driverCustomLabels ++ Map(
+  SPARK_APP_ID_LABEL -> appId,
+  SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE)
+val driverAnnotations =
+  KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, 
KUBERNETES_DRIVER_ANNOTATION_PREFIX)
+val 

[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-03-27 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r177504889
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import io.fabric8.kubernetes.api.model.Pod
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, 
MainAppResource}
+import org.apache.spark.internal.config.ConfigEntry
+
+private[spark] sealed trait KubernetesRoleSpecificConf
+
+private[spark] case class KubernetesDriverSpecificConf(
+  mainAppResource: Option[MainAppResource],
+  mainClass: String,
+  appName: String,
+  appArgs: Seq[String]) extends KubernetesRoleSpecificConf
+
+private[spark] case class KubernetesExecutorSpecificConf(
+  executorId: String, driverPod: Pod)
+  extends KubernetesRoleSpecificConf
+
+private[spark] class KubernetesConf[T <: KubernetesRoleSpecificConf](
+  val sparkConf: SparkConf,
+  val roleSpecificConf: T,
+  val appResourceNamePrefix: String,
+  val appId: String,
+  val roleLabels: Map[String, String],
+  val roleAnnotations: Map[String, String],
+  val roleSecretNamesToMountPaths: Map[String, String],
+  val roleEnvs: Map[String, String]) {
+
+  def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)
+
+  def sparkJars(): Seq[String] = sparkConf
+.getOption("spark.jars")
+.map(str => str.split(",").toSeq)
+.getOrElse(Seq.empty[String])
+
+  def sparkFiles(): Seq[String] = sparkConf
+.getOption("spark.files")
+.map(str => str.split(",").toSeq)
+.getOrElse(Seq.empty[String])
+
+  def imagePullPolicy(): String = 
sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
+
+  def nodeSelector(): Map[String, String] =
+KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, 
KUBERNETES_NODE_SELECTOR_PREFIX)
+
+  def get[T](config: ConfigEntry[T]): T = sparkConf.get(config)
+
+  def get(conf: String, defaultValue: String): String = 
sparkConf.get(conf, defaultValue)
+
+  def getOption(key: String): Option[String] = sparkConf.getOption(key)
+
+}
+
+
+private[spark] object KubernetesConf {
+  def createDriverConf(
+sparkConf: SparkConf,
+appName: String,
+appResourceNamePrefix: String,
+appId: String,
+mainAppResource: Option[MainAppResource],
+mainClass: String,
+appArgs: Array[String]): KubernetesConf[KubernetesDriverSpecificConf] 
= {
+val sparkConfWithMainAppJar = sparkConf.clone()
+mainAppResource.foreach {
+  case JavaMainAppResource(res) =>
+val previousJars = sparkConf
+  .getOption("spark.jars")
+  .map(_.split(","))
+  .getOrElse(Array.empty)
+if (!previousJars.contains(res)) {
+  sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res))
+}
+}
+
+val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
+  sparkConf,
+  KUBERNETES_DRIVER_LABEL_PREFIX)
+require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with 
key " +
+  s"$SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark 
bookkeeping " +
+  "operations.")
+require(!driverCustomLabels.contains(SPARK_ROLE_LABEL), "Label with 
key " +
+  s"$SPARK_ROLE_LABEL is not allowed as it is reserved for Spark 
bookkeeping " +
+  "operations.")
+val driverLabels = driverCustomLabels ++ Map(
+  SPARK_APP_ID_LABEL -> appId,
+  SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE)
+val driverAnnotations =
+  KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, 
KUBERNETES_DRIVER_ANNOTATION_PREFIX)
+

[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-03-27 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r177502836
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import io.fabric8.kubernetes.api.model.Pod
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, 
MainAppResource}
+import org.apache.spark.internal.config.ConfigEntry
+
+private[spark] sealed trait KubernetesRoleSpecificConf
+
+private[spark] case class KubernetesDriverSpecificConf(
+  mainAppResource: Option[MainAppResource],
+  mainClass: String,
+  appName: String,
+  appArgs: Seq[String]) extends KubernetesRoleSpecificConf
+
+private[spark] case class KubernetesExecutorSpecificConf(
+  executorId: String, driverPod: Pod)
+  extends KubernetesRoleSpecificConf
+
+private[spark] class KubernetesConf[T <: KubernetesRoleSpecificConf](
+  val sparkConf: SparkConf,
+  val roleSpecificConf: T,
+  val appResourceNamePrefix: String,
+  val appId: String,
+  val roleLabels: Map[String, String],
+  val roleAnnotations: Map[String, String],
+  val roleSecretNamesToMountPaths: Map[String, String],
+  val roleEnvs: Map[String, String]) {
+
+  def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)
+
+  def sparkJars(): Seq[String] = sparkConf
+.getOption("spark.jars")
+.map(str => str.split(",").toSeq)
+.getOrElse(Seq.empty[String])
+
+  def sparkFiles(): Seq[String] = sparkConf
+.getOption("spark.files")
+.map(str => str.split(",").toSeq)
+.getOrElse(Seq.empty[String])
+
+  def imagePullPolicy(): String = 
sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
+
+  def nodeSelector(): Map[String, String] =
+KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, 
KUBERNETES_NODE_SELECTOR_PREFIX)
+
+  def get[T](config: ConfigEntry[T]): T = sparkConf.get(config)
+
+  def get(conf: String, defaultValue: String): String = 
sparkConf.get(conf, defaultValue)
+
+  def getOption(key: String): Option[String] = sparkConf.getOption(key)
+
+}
+
+
+private[spark] object KubernetesConf {
+  def createDriverConf(
+sparkConf: SparkConf,
+appName: String,
+appResourceNamePrefix: String,
+appId: String,
+mainAppResource: Option[MainAppResource],
+mainClass: String,
+appArgs: Array[String]): KubernetesConf[KubernetesDriverSpecificConf] 
= {
+val sparkConfWithMainAppJar = sparkConf.clone()
+mainAppResource.foreach {
+  case JavaMainAppResource(res) =>
+val previousJars = sparkConf
+  .getOption("spark.jars")
+  .map(_.split(","))
+  .getOrElse(Array.empty)
+if (!previousJars.contains(res)) {
+  sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res))
+}
+}
+
+val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
+  sparkConf,
+  KUBERNETES_DRIVER_LABEL_PREFIX)
+require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with 
key " +
+  s"$SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark 
bookkeeping " +
+  "operations.")
+require(!driverCustomLabels.contains(SPARK_ROLE_LABEL), "Label with 
key " +
+  s"$SPARK_ROLE_LABEL is not allowed as it is reserved for Spark 
bookkeeping " +
+  "operations.")
+val driverLabels = driverCustomLabels ++ Map(
+  SPARK_APP_ID_LABEL -> appId,
+  SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE)
+val driverAnnotations =
+  KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, 
KUBERNETES_DRIVER_ANNOTATION_PREFIX)
+val 

[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-03-26 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r177293958
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import io.fabric8.kubernetes.api.model.Pod
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, 
MainAppResource}
+import org.apache.spark.internal.config.ConfigEntry
+
+private[spark] sealed trait KubernetesRoleSpecificConf
+
+private[spark] case class KubernetesDriverSpecificConf(
+  mainAppResource: Option[MainAppResource],
+  mainClass: String,
+  appName: String,
+  appArgs: Seq[String]) extends KubernetesRoleSpecificConf
+
+private[spark] case class KubernetesExecutorSpecificConf(
+  executorId: String, driverPod: Pod)
+  extends KubernetesRoleSpecificConf
+
+private[spark] class KubernetesConf[T <: KubernetesRoleSpecificConf](
+  val sparkConf: SparkConf,
+  val roleSpecificConf: T,
+  val appResourceNamePrefix: String,
+  val appId: String,
+  val roleLabels: Map[String, String],
+  val roleAnnotations: Map[String, String],
+  val roleSecretNamesToMountPaths: Map[String, String],
+  val roleEnvs: Map[String, String]) {
+
+  def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)
+
+  def sparkJars(): Seq[String] = sparkConf
+.getOption("spark.jars")
+.map(str => str.split(",").toSeq)
+.getOrElse(Seq.empty[String])
+
+  def sparkFiles(): Seq[String] = sparkConf
+.getOption("spark.files")
+.map(str => str.split(",").toSeq)
+.getOrElse(Seq.empty[String])
+
+  def imagePullPolicy(): String = 
sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
+
+  def nodeSelector(): Map[String, String] =
+KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, 
KUBERNETES_NODE_SELECTOR_PREFIX)
+
+  def get[T](config: ConfigEntry[T]): T = sparkConf.get(config)
+
+  def get(conf: String, defaultValue: String): String = 
sparkConf.get(conf, defaultValue)
+
+  def getOption(key: String): Option[String] = sparkConf.getOption(key)
+
+}
+
+
+private[spark] object KubernetesConf {
+  def createDriverConf(
+sparkConf: SparkConf,
+appName: String,
+appResourceNamePrefix: String,
+appId: String,
+mainAppResource: Option[MainAppResource],
+mainClass: String,
+appArgs: Array[String]): KubernetesConf[KubernetesDriverSpecificConf] 
= {
+val sparkConfWithMainAppJar = sparkConf.clone()
+mainAppResource.foreach {
+  case JavaMainAppResource(res) =>
+val previousJars = sparkConf
+  .getOption("spark.jars")
+  .map(_.split(","))
+  .getOrElse(Array.empty)
+if (!previousJars.contains(res)) {
+  sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res))
+}
+}
+
+val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
+  sparkConf,
+  KUBERNETES_DRIVER_LABEL_PREFIX)
+require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with 
key " +
+  s"$SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark 
bookkeeping " +
+  "operations.")
+require(!driverCustomLabels.contains(SPARK_ROLE_LABEL), "Label with 
key " +
+  s"$SPARK_ROLE_LABEL is not allowed as it is reserved for Spark 
bookkeeping " +
+  "operations.")
+val driverLabels = driverCustomLabels ++ Map(
+  SPARK_APP_ID_LABEL -> appId,
+  SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE)
+val driverAnnotations =
+  KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, 
KUBERNETES_DRIVER_ANNOTATION_PREFIX)
+

[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-03-26 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r177290859
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import io.fabric8.kubernetes.api.model.Pod
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, 
MainAppResource}
+import org.apache.spark.internal.config.ConfigEntry
+
+private[spark] sealed trait KubernetesRoleSpecificConf
+
+private[spark] case class KubernetesDriverSpecificConf(
+  mainAppResource: Option[MainAppResource],
+  mainClass: String,
+  appName: String,
+  appArgs: Seq[String]) extends KubernetesRoleSpecificConf
+
+private[spark] case class KubernetesExecutorSpecificConf(
+  executorId: String, driverPod: Pod)
+  extends KubernetesRoleSpecificConf
+
+private[spark] class KubernetesConf[T <: KubernetesRoleSpecificConf](
+  val sparkConf: SparkConf,
+  val roleSpecificConf: T,
+  val appResourceNamePrefix: String,
+  val appId: String,
+  val roleLabels: Map[String, String],
+  val roleAnnotations: Map[String, String],
+  val roleSecretNamesToMountPaths: Map[String, String]) {
+
+  def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)
+
+  def sparkJars(): Seq[String] = sparkConf
+.getOption("spark.jars")
+.map(str => str.split(",").toSeq)
+.getOrElse(Seq.empty[String])
+
+  def sparkFiles(): Seq[String] = sparkConf
+.getOption("spark.files")
+.map(str => str.split(",").toSeq)
+.getOrElse(Seq.empty[String])
+
+  def driverCustomEnvs(): Seq[(String, String)] =
+sparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq
+
+  def imagePullPolicy(): String = 
sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
+
+  def nodeSelector(): Map[String, String] =
+KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, 
KUBERNETES_NODE_SELECTOR_PREFIX)
+
+  def get[T](config: ConfigEntry[T]): T = sparkConf.get(config)
+
+  def get(conf: String, defaultValue: String): String = 
sparkConf.get(conf, defaultValue)
+
+  def getOption(key: String): Option[String] = sparkConf.getOption(key)
+
--- End diff --

Oh, I meant removing the extra new line.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-03-26 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r177290805
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala
 ---
@@ -14,17 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.deploy.k8s.submit.steps
+package org.apache.spark.deploy.k8s
 
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, 
PodBuilder}
 
-/**
- * Represents a step in configuring the Spark driver pod.
- */
-private[spark] trait DriverConfigurationStep {
+private[spark] case class SparkPod(pod: Pod, container: Container)
 
-  /**
-   * Apply some transformation to the previous state of the driver to add 
a new feature to it.
-   */
-  def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec
+private[spark] object SparkPod {
+  def initialPod(): SparkPod = {
+SparkPod(
+  new 
PodBuilder().withNewMetadata().endMetadata().withNewSpec().endSpec().build(),
--- End diff --

Got it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-03-26 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r177277920
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import io.fabric8.kubernetes.api.model.Pod
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, 
MainAppResource}
+import org.apache.spark.internal.config.ConfigEntry
+
+private[spark] sealed trait KubernetesRoleSpecificConf
+
+private[spark] case class KubernetesDriverSpecificConf(
+  mainAppResource: Option[MainAppResource],
+  mainClass: String,
+  appName: String,
+  appArgs: Seq[String]) extends KubernetesRoleSpecificConf
+
+private[spark] case class KubernetesExecutorSpecificConf(
+  executorId: String, driverPod: Pod)
+  extends KubernetesRoleSpecificConf
+
+private[spark] class KubernetesConf[T <: KubernetesRoleSpecificConf](
+  val sparkConf: SparkConf,
+  val roleSpecificConf: T,
+  val appResourceNamePrefix: String,
+  val appId: String,
+  val roleLabels: Map[String, String],
+  val roleAnnotations: Map[String, String],
+  val roleSecretNamesToMountPaths: Map[String, String]) {
+
+  def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)
+
+  def sparkJars(): Seq[String] = sparkConf
+.getOption("spark.jars")
+.map(str => str.split(",").toSeq)
+.getOrElse(Seq.empty[String])
+
+  def sparkFiles(): Seq[String] = sparkConf
+.getOption("spark.files")
+.map(str => str.split(",").toSeq)
+.getOrElse(Seq.empty[String])
+
+  def driverCustomEnvs(): Seq[(String, String)] =
+sparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq
+
+  def imagePullPolicy(): String = 
sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
+
+  def nodeSelector(): Map[String, String] =
+KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, 
KUBERNETES_NODE_SELECTOR_PREFIX)
+
+  def get[T](config: ConfigEntry[T]): T = sparkConf.get(config)
+
+  def get(conf: String, defaultValue: String): String = 
sparkConf.get(conf, defaultValue)
+
+  def getOption(key: String): Option[String] = sparkConf.getOption(key)
+
--- End diff --

Sorry, do you mean we should remove this newline or that one should be 
added here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-03-26 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r177277876
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import io.fabric8.kubernetes.api.model.Pod
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, 
MainAppResource}
+import org.apache.spark.internal.config.ConfigEntry
+
+private[spark] sealed trait KubernetesRoleSpecificConf
+
+private[spark] case class KubernetesDriverSpecificConf(
+  mainAppResource: Option[MainAppResource],
+  mainClass: String,
+  appName: String,
+  appArgs: Seq[String]) extends KubernetesRoleSpecificConf
+
+private[spark] case class KubernetesExecutorSpecificConf(
+  executorId: String, driverPod: Pod)
+  extends KubernetesRoleSpecificConf
+
+private[spark] class KubernetesConf[T <: KubernetesRoleSpecificConf](
--- End diff --

Maybe should be a `case class`? This seems like a struct-like object which 
inclines me to think using a `case class` seems more idiomatic here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-03-26 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r177273537
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
 ---
@@ -180,20 +167,17 @@ private[spark] class Client(
   }
 
   // Build a Config Map that will house spark conf properties in a single 
file for spark-submit
-  private def buildConfigMap(configMapName: String, conf: SparkConf): 
ConfigMap = {
+  private def buildConfigMap(configMapName: String, conf: Map[String, 
String]): ConfigMap = {
 val properties = new Properties()
-conf.getAll.foreach { case (k, v) =>
+conf.foreach { case (k, v) =>
   properties.setProperty(k, v)
 }
 val propertiesWriter = new StringWriter()
 properties.store(propertiesWriter,
   s"Java properties built from Kubernetes config map with name: 
$configMapName")
-
-val namespace = conf.get(KUBERNETES_NAMESPACE)
 new ConfigMapBuilder()
   .withNewMetadata()
 .withName(configMapName)
-.withNamespace(namespace)
--- End diff --

It's not necessary to set namespaces on these objects because the 
kubernetes client itself is namespaced.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-03-26 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r177273458
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala
 ---
@@ -14,17 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.deploy.k8s.submit.steps
+package org.apache.spark.deploy.k8s
 
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, 
PodBuilder}
 
-/**
- * Represents a step in configuring the Spark driver pod.
- */
-private[spark] trait DriverConfigurationStep {
+private[spark] case class SparkPod(pod: Pod, container: Container)
 
-  /**
-   * Apply some transformation to the previous state of the driver to add 
a new feature to it.
-   */
-  def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec
+private[spark] object SparkPod {
+  def initialPod(): SparkPod = {
+SparkPod(
+  new 
PodBuilder().withNewMetadata().endMetadata().withNewSpec().endSpec().build(),
--- End diff --

Sort of. It allows everything that consumes one of these to use 
`.editMetadata()` or `editOrNewMetadata` when creating features. If you don't 
initialize the metadata and spec and then a downstream caller tries to invoke 
`editMetadata` then we throw an NPE.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-03-26 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r177267646
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, 
EnvVarSourceBuilder, HasMetadata, PodBuilder, QuantityBuilder}
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesDriverSpecificConf, KubernetesUtils, SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.internal.config._
+import org.apache.spark.launcher.SparkLauncher
+
+private[spark] class BasicDriverFeatureStep(
+  kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf])
--- End diff --

Should we rename this to `driverConf`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-03-26 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r177266945
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import io.fabric8.kubernetes.api.model.Pod
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, 
MainAppResource}
+import org.apache.spark.internal.config.ConfigEntry
+
+private[spark] sealed trait KubernetesRoleSpecificConf
+
+private[spark] case class KubernetesDriverSpecificConf(
+  mainAppResource: Option[MainAppResource],
+  mainClass: String,
+  appName: String,
+  appArgs: Seq[String]) extends KubernetesRoleSpecificConf
+
+private[spark] case class KubernetesExecutorSpecificConf(
+  executorId: String, driverPod: Pod)
+  extends KubernetesRoleSpecificConf
+
+private[spark] class KubernetesConf[T <: KubernetesRoleSpecificConf](
+  val sparkConf: SparkConf,
+  val roleSpecificConf: T,
+  val appResourceNamePrefix: String,
+  val appId: String,
+  val roleLabels: Map[String, String],
+  val roleAnnotations: Map[String, String],
+  val roleSecretNamesToMountPaths: Map[String, String]) {
+
+  def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)
+
+  def sparkJars(): Seq[String] = sparkConf
+.getOption("spark.jars")
+.map(str => str.split(",").toSeq)
+.getOrElse(Seq.empty[String])
+
+  def sparkFiles(): Seq[String] = sparkConf
+.getOption("spark.files")
+.map(str => str.split(",").toSeq)
+.getOrElse(Seq.empty[String])
+
+  def driverCustomEnvs(): Seq[(String, String)] =
+sparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq
+
+  def imagePullPolicy(): String = 
sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
+
+  def nodeSelector(): Map[String, String] =
+KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, 
KUBERNETES_NODE_SELECTOR_PREFIX)
+
+  def get[T](config: ConfigEntry[T]): T = sparkConf.get(config)
+
+  def get(conf: String, defaultValue: String): String = 
sparkConf.get(conf, defaultValue)
+
+  def getOption(key: String): Option[String] = sparkConf.getOption(key)
+
+}
+
+private[spark] object KubernetesConf {
+  def createDriverConf(
+sparkConf: SparkConf,
+appName: String,
+appResourceNamePrefix: String,
+appId: String,
+mainAppResource: Option[MainAppResource],
+mainClass: String,
+appArgs: Array[String]): KubernetesConf[KubernetesDriverSpecificConf] 
= {
+val sparkConfWithMainAppJar = sparkConf.clone()
+mainAppResource.foreach {
+  case JavaMainAppResource(res) =>
+val previousJars = sparkConf
+  .getOption("spark.jars")
+  .map(_.split(","))
+  .getOrElse(Array.empty)
+if (!previousJars.contains(res)) {
+  sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res))
+}
+}
+val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
+  sparkConf,
+  KUBERNETES_DRIVER_LABEL_PREFIX)
+require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with 
key " +
+  s"$SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark 
bookkeeping " +
+  "operations.")
+require(!driverCustomLabels.contains(SPARK_ROLE_LABEL), "Label with 
key " +
+  s"$SPARK_ROLE_LABEL is not allowed as it is reserved for Spark 
bookkeeping " +
+  "operations.")
+val driverLabels = driverCustomLabels ++ Map(
+  SPARK_APP_ID_LABEL -> appId,
+  SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE)
+val driverAnnotations =
+  

[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-03-26 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r177270542
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
 ---
@@ -180,20 +167,17 @@ private[spark] class Client(
   }
 
   // Build a Config Map that will house spark conf properties in a single 
file for spark-submit
-  private def buildConfigMap(configMapName: String, conf: SparkConf): 
ConfigMap = {
+  private def buildConfigMap(configMapName: String, conf: Map[String, 
String]): ConfigMap = {
 val properties = new Properties()
-conf.getAll.foreach { case (k, v) =>
+conf.foreach { case (k, v) =>
   properties.setProperty(k, v)
 }
 val propertiesWriter = new StringWriter()
 properties.store(propertiesWriter,
   s"Java properties built from Kubernetes config map with name: 
$configMapName")
-
-val namespace = conf.get(KUBERNETES_NAMESPACE)
 new ConfigMapBuilder()
   .withNewMetadata()
 .withName(configMapName)
-.withNamespace(namespace)
--- End diff --

Why removed this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-03-26 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r177266807
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import io.fabric8.kubernetes.api.model.Pod
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, 
MainAppResource}
+import org.apache.spark.internal.config.ConfigEntry
+
+private[spark] sealed trait KubernetesRoleSpecificConf
+
+private[spark] case class KubernetesDriverSpecificConf(
+  mainAppResource: Option[MainAppResource],
+  mainClass: String,
+  appName: String,
+  appArgs: Seq[String]) extends KubernetesRoleSpecificConf
+
+private[spark] case class KubernetesExecutorSpecificConf(
+  executorId: String, driverPod: Pod)
+  extends KubernetesRoleSpecificConf
+
+private[spark] class KubernetesConf[T <: KubernetesRoleSpecificConf](
+  val sparkConf: SparkConf,
+  val roleSpecificConf: T,
+  val appResourceNamePrefix: String,
+  val appId: String,
+  val roleLabels: Map[String, String],
+  val roleAnnotations: Map[String, String],
+  val roleSecretNamesToMountPaths: Map[String, String]) {
+
+  def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)
+
+  def sparkJars(): Seq[String] = sparkConf
+.getOption("spark.jars")
+.map(str => str.split(",").toSeq)
+.getOrElse(Seq.empty[String])
+
+  def sparkFiles(): Seq[String] = sparkConf
+.getOption("spark.files")
+.map(str => str.split(",").toSeq)
+.getOrElse(Seq.empty[String])
+
+  def driverCustomEnvs(): Seq[(String, String)] =
+sparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq
+
+  def imagePullPolicy(): String = 
sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
+
+  def nodeSelector(): Map[String, String] =
+KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, 
KUBERNETES_NODE_SELECTOR_PREFIX)
+
+  def get[T](config: ConfigEntry[T]): T = sparkConf.get(config)
+
+  def get(conf: String, defaultValue: String): String = 
sparkConf.get(conf, defaultValue)
+
+  def getOption(key: String): Option[String] = sparkConf.getOption(key)
+
+}
+
+private[spark] object KubernetesConf {
+  def createDriverConf(
+sparkConf: SparkConf,
+appName: String,
+appResourceNamePrefix: String,
+appId: String,
+mainAppResource: Option[MainAppResource],
+mainClass: String,
+appArgs: Array[String]): KubernetesConf[KubernetesDriverSpecificConf] 
= {
+val sparkConfWithMainAppJar = sparkConf.clone()
+mainAppResource.foreach {
+  case JavaMainAppResource(res) =>
+val previousJars = sparkConf
+  .getOption("spark.jars")
+  .map(_.split(","))
+  .getOrElse(Array.empty)
+if (!previousJars.contains(res)) {
+  sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res))
+}
+}
+val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
+  sparkConf,
+  KUBERNETES_DRIVER_LABEL_PREFIX)
+require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with 
key " +
+  s"$SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark 
bookkeeping " +
+  "operations.")
+require(!driverCustomLabels.contains(SPARK_ROLE_LABEL), "Label with 
key " +
+  s"$SPARK_ROLE_LABEL is not allowed as it is reserved for Spark 
bookkeeping " +
+  "operations.")
+val driverLabels = driverCustomLabels ++ Map(
+  SPARK_APP_ID_LABEL -> appId,
+  SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE)
+val driverAnnotations =
+  

[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-03-26 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r177266854
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import io.fabric8.kubernetes.api.model.Pod
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, 
MainAppResource}
+import org.apache.spark.internal.config.ConfigEntry
+
+private[spark] sealed trait KubernetesRoleSpecificConf
+
+private[spark] case class KubernetesDriverSpecificConf(
+  mainAppResource: Option[MainAppResource],
+  mainClass: String,
+  appName: String,
+  appArgs: Seq[String]) extends KubernetesRoleSpecificConf
+
+private[spark] case class KubernetesExecutorSpecificConf(
+  executorId: String, driverPod: Pod)
+  extends KubernetesRoleSpecificConf
+
+private[spark] class KubernetesConf[T <: KubernetesRoleSpecificConf](
+  val sparkConf: SparkConf,
+  val roleSpecificConf: T,
+  val appResourceNamePrefix: String,
+  val appId: String,
+  val roleLabels: Map[String, String],
+  val roleAnnotations: Map[String, String],
+  val roleSecretNamesToMountPaths: Map[String, String]) {
+
+  def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)
+
+  def sparkJars(): Seq[String] = sparkConf
+.getOption("spark.jars")
+.map(str => str.split(",").toSeq)
+.getOrElse(Seq.empty[String])
+
+  def sparkFiles(): Seq[String] = sparkConf
+.getOption("spark.files")
+.map(str => str.split(",").toSeq)
+.getOrElse(Seq.empty[String])
+
+  def driverCustomEnvs(): Seq[(String, String)] =
+sparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq
+
+  def imagePullPolicy(): String = 
sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
+
+  def nodeSelector(): Map[String, String] =
+KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, 
KUBERNETES_NODE_SELECTOR_PREFIX)
+
+  def get[T](config: ConfigEntry[T]): T = sparkConf.get(config)
+
+  def get(conf: String, defaultValue: String): String = 
sparkConf.get(conf, defaultValue)
+
+  def getOption(key: String): Option[String] = sparkConf.getOption(key)
+
+}
+
+private[spark] object KubernetesConf {
+  def createDriverConf(
+sparkConf: SparkConf,
+appName: String,
+appResourceNamePrefix: String,
+appId: String,
+mainAppResource: Option[MainAppResource],
+mainClass: String,
+appArgs: Array[String]): KubernetesConf[KubernetesDriverSpecificConf] 
= {
+val sparkConfWithMainAppJar = sparkConf.clone()
+mainAppResource.foreach {
+  case JavaMainAppResource(res) =>
+val previousJars = sparkConf
+  .getOption("spark.jars")
+  .map(_.split(","))
+  .getOrElse(Array.empty)
+if (!previousJars.contains(res)) {
+  sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res))
+}
+}
+val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
--- End diff --

Can you add a new line before this line?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-03-26 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r177267477
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesSpec.scala
 ---
@@ -14,25 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.deploy.k8s.submit.steps
+package org.apache.spark.deploy.k8s
 
-import org.apache.spark.deploy.k8s.MountSecretsBootstrap
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+import io.fabric8.kubernetes.api.model.HasMetadata
 
-/**
- * A driver configuration step for mounting user-specified secrets onto 
user-specified paths.
- *
- * @param bootstrap a utility actually handling mounting of the secrets.
- */
-private[spark] class DriverMountSecretsStep(
-bootstrap: MountSecretsBootstrap) extends DriverConfigurationStep {
+private[k8s] case class KubernetesSpec(
+  pod: SparkPod,
+  additionalDriverKubernetesResources: Seq[HasMetadata],
+  podJavaSystemProperties: Map[String, String])
--- End diff --

Can we shorten the name to just `systemProperties`? One of the most 
frequent types of comments I got while working on the upstreaming was to use 
short names.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-03-26 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r177269929
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, KubernetesSpec}
+import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, 
DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, 
MountSecretsFeatureStep}
+
+private[spark] class KubernetesDriverBuilder(
+  provideBasicStep: (KubernetesConf[KubernetesDriverSpecificConf]) => 
BasicDriverFeatureStep =
+new BasicDriverFeatureStep(_),
+  provideCredentialsStep: (KubernetesConf[KubernetesDriverSpecificConf])
+=> DriverKubernetesCredentialsFeatureStep =
+new DriverKubernetesCredentialsFeatureStep(_),
+  provideServiceStep: (KubernetesConf[KubernetesDriverSpecificConf]) => 
DriverServiceFeatureStep =
+new DriverServiceFeatureStep(_),
+  provideSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
+=> MountSecretsFeatureStep) =
+new MountSecretsFeatureStep(_)) {
+
+  def buildFromFeatures(
+kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]): 
KubernetesSpec = {
+val baseFeatures = Seq(
+  provideBasicStep(kubernetesConf),
+  provideCredentialsStep(kubernetesConf),
+  provideServiceStep(kubernetesConf))
+val allFeatures = if 
(kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
+  baseFeatures ++ Seq(provideSecretsStep(kubernetesConf))
+} else baseFeatures
+var spec = 
KubernetesSpec.initialSpec(kubernetesConf.sparkConf.getAll.toMap)
--- End diff --

Can you add a new line before this line?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-03-26 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r177265926
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import io.fabric8.kubernetes.api.model.Pod
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, 
MainAppResource}
+import org.apache.spark.internal.config.ConfigEntry
+
+private[spark] sealed trait KubernetesRoleSpecificConf
+
+private[spark] case class KubernetesDriverSpecificConf(
+  mainAppResource: Option[MainAppResource],
+  mainClass: String,
+  appName: String,
+  appArgs: Seq[String]) extends KubernetesRoleSpecificConf
+
+private[spark] case class KubernetesExecutorSpecificConf(
+  executorId: String, driverPod: Pod)
+  extends KubernetesRoleSpecificConf
+
+private[spark] class KubernetesConf[T <: KubernetesRoleSpecificConf](
+  val sparkConf: SparkConf,
+  val roleSpecificConf: T,
+  val appResourceNamePrefix: String,
+  val appId: String,
+  val roleLabels: Map[String, String],
+  val roleAnnotations: Map[String, String],
+  val roleSecretNamesToMountPaths: Map[String, String]) {
+
+  def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)
+
+  def sparkJars(): Seq[String] = sparkConf
+.getOption("spark.jars")
+.map(str => str.split(",").toSeq)
+.getOrElse(Seq.empty[String])
+
+  def sparkFiles(): Seq[String] = sparkConf
+.getOption("spark.files")
+.map(str => str.split(",").toSeq)
+.getOrElse(Seq.empty[String])
+
+  def driverCustomEnvs(): Seq[(String, String)] =
--- End diff --

This is driver specific and probably should not be here. What about making 
custom envs as an argument of the class similarly to labels and annotations? 
Then `createDriverConf` below gets the driver custom envs and pass them in. 
Even though we don't have a key for specifying custom executor envs, we may end 
up having one in the future and needing the same mechanism of passing them in 
here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-03-26 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r177267577
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala
 ---
@@ -14,17 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.deploy.k8s.submit.steps
+package org.apache.spark.deploy.k8s
 
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, 
PodBuilder}
 
-/**
- * Represents a step in configuring the Spark driver pod.
- */
-private[spark] trait DriverConfigurationStep {
+private[spark] case class SparkPod(pod: Pod, container: Container)
 
-  /**
-   * Apply some transformation to the previous state of the driver to add 
a new feature to it.
-   */
-  def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec
+private[spark] object SparkPod {
+  def initialPod(): SparkPod = {
+SparkPod(
+  new 
PodBuilder().withNewMetadata().endMetadata().withNewSpec().endSpec().build(),
--- End diff --

Do you need `.withNewMetadata().endMetadata().withNewSpec().endSpec()` here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-03-26 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r177265950
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import io.fabric8.kubernetes.api.model.Pod
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, 
MainAppResource}
+import org.apache.spark.internal.config.ConfigEntry
+
+private[spark] sealed trait KubernetesRoleSpecificConf
+
+private[spark] case class KubernetesDriverSpecificConf(
+  mainAppResource: Option[MainAppResource],
+  mainClass: String,
+  appName: String,
+  appArgs: Seq[String]) extends KubernetesRoleSpecificConf
+
+private[spark] case class KubernetesExecutorSpecificConf(
+  executorId: String, driverPod: Pod)
+  extends KubernetesRoleSpecificConf
+
+private[spark] class KubernetesConf[T <: KubernetesRoleSpecificConf](
+  val sparkConf: SparkConf,
+  val roleSpecificConf: T,
+  val appResourceNamePrefix: String,
+  val appId: String,
+  val roleLabels: Map[String, String],
+  val roleAnnotations: Map[String, String],
+  val roleSecretNamesToMountPaths: Map[String, String]) {
+
+  def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)
+
+  def sparkJars(): Seq[String] = sparkConf
+.getOption("spark.jars")
+.map(str => str.split(",").toSeq)
+.getOrElse(Seq.empty[String])
+
+  def sparkFiles(): Seq[String] = sparkConf
+.getOption("spark.files")
+.map(str => str.split(",").toSeq)
+.getOrElse(Seq.empty[String])
+
+  def driverCustomEnvs(): Seq[(String, String)] =
+sparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq
+
+  def imagePullPolicy(): String = 
sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
+
+  def nodeSelector(): Map[String, String] =
+KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, 
KUBERNETES_NODE_SELECTOR_PREFIX)
+
+  def get[T](config: ConfigEntry[T]): T = sparkConf.get(config)
+
+  def get(conf: String, defaultValue: String): String = 
sparkConf.get(conf, defaultValue)
+
+  def getOption(key: String): Option[String] = sparkConf.getOption(key)
+
--- End diff --

Extra new line.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-03-26 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r177265297
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesSpec.scala
 ---
@@ -14,25 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.deploy.k8s.submit.steps
+package org.apache.spark.deploy.k8s
 
-import org.apache.spark.deploy.k8s.MountSecretsBootstrap
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+import io.fabric8.kubernetes.api.model.HasMetadata
 
-/**
- * A driver configuration step for mounting user-specified secrets onto 
user-specified paths.
- *
- * @param bootstrap a utility actually handling mounting of the secrets.
- */
-private[spark] class DriverMountSecretsStep(
-bootstrap: MountSecretsBootstrap) extends DriverConfigurationStep {
+private[k8s] case class KubernetesSpec(
+  pod: SparkPod,
+  additionalDriverKubernetesResources: Seq[HasMetadata],
+  podJavaSystemProperties: Map[String, String])
 
-  override def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec = {
-val pod = bootstrap.addSecretVolumes(driverSpec.driverPod)
-val container = bootstrap.mountSecrets(driverSpec.driverContainer)
-driverSpec.copy(
-  driverPod = pod,
-  driverContainer = container
-)
-  }
+private[k8s] object KubernetesSpec {
--- End diff --

Probably should just be `private[spark]`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-03-26 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/20910#discussion_r177264954
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
 ---
@@ -211,34 +195,34 @@ private[spark] class KubernetesClientApplication 
extends SparkApplication {
   }
 
   private def run(clientArguments: ClientArguments, sparkConf: SparkConf): 
Unit = {
-val namespace = sparkConf.get(KUBERNETES_NAMESPACE)
+val appName = sparkConf.getOption("spark.app.name").getOrElse("spark")
 // For constructing the app ID, we can't use the Spark application 
name, as the app ID is going
 // to be added as a label to group resources belonging to the same 
application. Label values are
 // considerably restrictive, e.g. must be no longer than 63 characters 
in length. So we generate
 // a unique app ID (captured by spark.app.id) in the format below.
 val kubernetesAppId = 
s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}"
 val launchTime = System.currentTimeMillis()
 val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
-val appName = sparkConf.getOption("spark.app.name").getOrElse("spark")
 val kubernetesResourceNamePrefix = {
   s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")
 }
+val kubernetesConf = KubernetesConf.createDriverConf(
+  sparkConf,
+  appName,
+  kubernetesResourceNamePrefix,
+  kubernetesAppId,
+  clientArguments.mainAppResource,
+  clientArguments.mainClass,
+  clientArguments.driverArgs)
+val orchestrator = new KubernetesDriverBuilder
--- End diff --

Should be `builder`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...

2018-03-26 Thread mccheah
GitHub user mccheah opened a pull request:

https://github.com/apache/spark/pull/20910

[SPARK-22839] [K8s] Refactor to unify driver and executor pod builder APIs

## What changes were proposed in this pull request?

Breaks down the construction of driver pods and executor pods in a way that 
uses a common abstraction for both spark-submit creating the driver and 
KubernetesClusterSchedulerBackend creating the executor. Encourages more code 
reuse and is more legible than the older approach.

The high-level design is discussed in more detail on the JIRA ticket. This 
pull request is the implementation of that design with some minor changes in 
the implementation details.

No user-facing behavior should break as a result of this change.

## How was this patch tested?

Migrated all unit tests from the old submission steps architecture to the 
new architecture. Integration tests should not have to change and pass given 
that this shouldn't change any outward behavior.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mccheah/spark spark-22839-incremental

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20910.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20910


commit f1b8c08f90b3d3a689e0206d8f8c71c14bb91899
Author: mcheah 
Date:   2018-03-26T22:17:46Z

Fundamental building blocks for the new pod construction architecture.

commit 80e1562f4ec73f04a3009d16739f9d147625cfe8
Author: mcheah 
Date:   2018-03-26T22:22:00Z

Intermediate commit to move file.

commit c3460ae49a3206a4abd1e386caeacca249cb2e4e
Author: mcheah 
Date:   2018-03-26T22:26:31Z

Move basic driver configuration to new architecture.

commit 6d1711bbe8601340f5bfdc740177294cb5dd969c
Author: mcheah 
Date:   2018-03-26T22:28:46Z

Intermediate commit to move file

commit 4036d724268a8e9915f32838bacaff76c0754ada
Author: mcheah 
Date:   2018-03-26T22:31:20Z

Migrate mounting K8s credentials to the new architecture.

commit d46d6712b52247b43ecc9f8404c44c7eafba97ad
Author: mcheah 
Date:   2018-03-26T22:33:19Z

Intermediate commit to move file.

commit 2936aa5d7c98b15dcb37ecf8022aa7313062fb81
Author: mcheah 
Date:   2018-03-26T22:36:25Z

Migrate creating the driver service to the new architecture.

commit d2751b6696522bd775d6d9907574ebbe6aea6434
Author: mcheah 
Date:   2018-03-26T22:37:31Z

Remove dependency resolution step entirely.

It's now subsumed by the basicdriverfeaturestep.

commit 430fbb2a7d6559fd6c32a80a77e9d553df419223
Author: mcheah 
Date:   2018-03-26T22:43:01Z

Move mounting driver secrets to new architecture.

commit fd3e8e6eae52af46c9eafe569890ae225a11c596
Author: mcheah 
Date:   2018-03-26T22:48:18Z

Complete driver migration to new pod construction architecture.

commit f0ea6d9a87aa24c47dbcc3cf6e5d73a033a72601
Author: mcheah 
Date:   2018-03-26T22:49:33Z

Intermediate commit to move file

commit 67e9ca165282d05c3b3ada46827bfd0756e5d1c3
Author: mcheah 
Date:   2018-03-26T22:53:15Z

Migrate executor pod construction to use the new architecture.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org