Repository: spark
Updated Branches:
  refs/heads/master 0323e6146 -> a83ae0d9b


http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
new file mode 100644
index 0000000..9d02f56
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import io.fabric8.kubernetes.api.model.PodBuilder
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesExecutorSpecificConf, SecretVolumeUtils, SparkPod}
+
+class MountSecretsFeatureStepSuite extends SparkFunSuite {
+
+  private val SECRET_FOO = "foo"
+  private val SECRET_BAR = "bar"
+  private val SECRET_MOUNT_PATH = "/etc/secrets/driver"
+
+  test("mounts all given secrets") {
+    val baseDriverPod = SparkPod.initialPod()
+    val secretNamesToMountPaths = Map(
+      SECRET_FOO -> SECRET_MOUNT_PATH,
+      SECRET_BAR -> SECRET_MOUNT_PATH)
+    val sparkConf = new SparkConf(false)
+    val kubernetesConf = KubernetesConf(
+      sparkConf,
+      KubernetesExecutorSpecificConf("1", new PodBuilder().build()),
+      "resource-name-prefix",
+      "app-id",
+      Map.empty,
+      Map.empty,
+      secretNamesToMountPaths,
+      Map.empty)
+
+    val step = new MountSecretsFeatureStep(kubernetesConf)
+    val driverPodWithSecretsMounted = step.configurePod(baseDriverPod).pod
+    val driverContainerWithSecretsMounted = 
step.configurePod(baseDriverPod).container
+
+    Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach { volumeName =>
+      assert(SecretVolumeUtils.podHasVolume(driverPodWithSecretsMounted, 
volumeName))
+    }
+    Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach { volumeName =>
+      assert(SecretVolumeUtils.containerHasVolume(
+        driverContainerWithSecretsMounted, volumeName, SECRET_MOUNT_PATH))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
index 6a50159..c1b203e 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
@@ -16,22 +16,17 @@
  */
 package org.apache.spark.deploy.k8s.submit
 
-import scala.collection.JavaConverters._
-
-import com.google.common.collect.Iterables
 import io.fabric8.kubernetes.api.model._
 import io.fabric8.kubernetes.client.{KubernetesClient, Watch}
 import io.fabric8.kubernetes.client.dsl.{MixedOperation, 
NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource}
 import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
 import org.mockito.Mockito.{doReturn, verify, when}
-import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
 import org.scalatest.BeforeAndAfter
 import org.scalatest.mockito.MockitoSugar._
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, 
KubernetesDriverSpecificConf, SparkPod}
 import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
 
 class ClientSuite extends SparkFunSuite with BeforeAndAfter {
 
@@ -39,6 +34,74 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
   private val DRIVER_POD_API_VERSION = "v1"
   private val DRIVER_POD_KIND = "pod"
   private val KUBERNETES_RESOURCE_PREFIX = "resource-example"
+  private val POD_NAME = "driver"
+  private val CONTAINER_NAME = "container"
+  private val APP_ID = "app-id"
+  private val APP_NAME = "app"
+  private val MAIN_CLASS = "main"
+  private val APP_ARGS = Seq("arg1", "arg2")
+  private val RESOLVED_JAVA_OPTIONS = Map(
+    "conf1key" -> "conf1value",
+    "conf2key" -> "conf2value")
+  private val BUILT_DRIVER_POD =
+    new PodBuilder()
+      .withNewMetadata()
+        .withName(POD_NAME)
+        .endMetadata()
+      .withNewSpec()
+        .withHostname("localhost")
+        .endSpec()
+      .build()
+  private val BUILT_DRIVER_CONTAINER = new 
ContainerBuilder().withName(CONTAINER_NAME).build()
+  private val ADDITIONAL_RESOURCES = Seq(
+    new 
SecretBuilder().withNewMetadata().withName("secret").endMetadata().build())
+
+  private val BUILT_KUBERNETES_SPEC = KubernetesDriverSpec(
+    SparkPod(BUILT_DRIVER_POD, BUILT_DRIVER_CONTAINER),
+    ADDITIONAL_RESOURCES,
+    RESOLVED_JAVA_OPTIONS)
+
+  private val FULL_EXPECTED_CONTAINER = new 
ContainerBuilder(BUILT_DRIVER_CONTAINER)
+    .addNewEnv()
+      .withName(ENV_SPARK_CONF_DIR)
+      .withValue(SPARK_CONF_DIR_INTERNAL)
+      .endEnv()
+    .addNewVolumeMount()
+      .withName(SPARK_CONF_VOLUME)
+      .withMountPath(SPARK_CONF_DIR_INTERNAL)
+      .endVolumeMount()
+    .build()
+  private val FULL_EXPECTED_POD = new PodBuilder(BUILT_DRIVER_POD)
+    .editSpec()
+      .addToContainers(FULL_EXPECTED_CONTAINER)
+      .addNewVolume()
+        .withName(SPARK_CONF_VOLUME)
+        
.withNewConfigMap().withName(s"$KUBERNETES_RESOURCE_PREFIX-driver-conf-map").endConfigMap()
+        .endVolume()
+      .endSpec()
+    .build()
+
+  private val POD_WITH_OWNER_REFERENCE = new PodBuilder(FULL_EXPECTED_POD)
+    .editMetadata()
+      .withUid(DRIVER_POD_UID)
+      .endMetadata()
+    .withApiVersion(DRIVER_POD_API_VERSION)
+    .withKind(DRIVER_POD_KIND)
+    .build()
+
+  private val ADDITIONAL_RESOURCES_WITH_OWNER_REFERENCES = 
ADDITIONAL_RESOURCES.map { secret =>
+    new SecretBuilder(secret)
+      .editMetadata()
+        .addNewOwnerReference()
+          .withName(POD_NAME)
+          .withApiVersion(DRIVER_POD_API_VERSION)
+          .withKind(DRIVER_POD_KIND)
+          .withController(true)
+          .withUid(DRIVER_POD_UID)
+          .endOwnerReference()
+        .endMetadata()
+      .build()
+  }
 
   private type ResourceList = 
NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable[
       HasMetadata, Boolean]
@@ -57,112 +120,85 @@ class ClientSuite extends SparkFunSuite with 
BeforeAndAfter {
   private var loggingPodStatusWatcher: LoggingPodStatusWatcher = _
 
   @Mock
+  private var driverBuilder: KubernetesDriverBuilder = _
+
+  @Mock
   private var resourceList: ResourceList = _
 
-  private val submissionSteps = Seq(FirstTestConfigurationStep, 
SecondTestConfigurationStep)
+  private var kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf] = _
+
+  private var sparkConf: SparkConf = _
   private var createdPodArgumentCaptor: ArgumentCaptor[Pod] = _
   private var createdResourcesArgumentCaptor: ArgumentCaptor[HasMetadata] = _
-  private var createdContainerArgumentCaptor: ArgumentCaptor[Container] = _
 
   before {
     MockitoAnnotations.initMocks(this)
+    sparkConf = new SparkConf(false)
+    kubernetesConf = KubernetesConf[KubernetesDriverSpecificConf](
+      sparkConf,
+      KubernetesDriverSpecificConf(None, MAIN_CLASS, APP_NAME, APP_ARGS),
+      KUBERNETES_RESOURCE_PREFIX,
+      APP_ID,
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Map.empty)
+    
when(driverBuilder.buildFromFeatures(kubernetesConf)).thenReturn(BUILT_KUBERNETES_SPEC)
     when(kubernetesClient.pods()).thenReturn(podOperations)
-    
when(podOperations.withName(FirstTestConfigurationStep.podName)).thenReturn(namedPods)
+    when(podOperations.withName(POD_NAME)).thenReturn(namedPods)
 
     createdPodArgumentCaptor = ArgumentCaptor.forClass(classOf[Pod])
     createdResourcesArgumentCaptor = 
ArgumentCaptor.forClass(classOf[HasMetadata])
-    
when(podOperations.create(createdPodArgumentCaptor.capture())).thenAnswer(new 
Answer[Pod] {
-      override def answer(invocation: InvocationOnMock): Pod = {
-        new PodBuilder(invocation.getArgumentAt(0, classOf[Pod]))
-          .editMetadata()
-            .withUid(DRIVER_POD_UID)
-            .endMetadata()
-          .withApiVersion(DRIVER_POD_API_VERSION)
-          .withKind(DRIVER_POD_KIND)
-          .build()
-      }
-    })
-    
when(podOperations.withName(FirstTestConfigurationStep.podName)).thenReturn(namedPods)
+    
when(podOperations.create(FULL_EXPECTED_POD)).thenReturn(POD_WITH_OWNER_REFERENCE)
     when(namedPods.watch(loggingPodStatusWatcher)).thenReturn(mock[Watch])
     doReturn(resourceList)
       .when(kubernetesClient)
       .resourceList(createdResourcesArgumentCaptor.capture())
   }
 
-  test("The client should configure the pod with the submission steps.") {
+  test("The client should configure the pod using the builder.") {
     val submissionClient = new Client(
-      submissionSteps,
-      new SparkConf(false),
+      driverBuilder,
+      kubernetesConf,
       kubernetesClient,
       false,
       "spark",
       loggingPodStatusWatcher,
       KUBERNETES_RESOURCE_PREFIX)
     submissionClient.run()
-    val createdPod = createdPodArgumentCaptor.getValue
-    assert(createdPod.getMetadata.getName === 
FirstTestConfigurationStep.podName)
-    assert(createdPod.getMetadata.getLabels.asScala ===
-      Map(FirstTestConfigurationStep.labelKey -> 
FirstTestConfigurationStep.labelValue))
-    assert(createdPod.getMetadata.getAnnotations.asScala ===
-      Map(SecondTestConfigurationStep.annotationKey ->
-        SecondTestConfigurationStep.annotationValue))
-    assert(createdPod.getSpec.getContainers.size() === 1)
-    assert(createdPod.getSpec.getContainers.get(0).getName ===
-      SecondTestConfigurationStep.containerName)
+    verify(podOperations).create(FULL_EXPECTED_POD)
   }
 
   test("The client should create Kubernetes resources") {
-    val EXAMPLE_JAVA_OPTS = "-XX:+HeapDumpOnOutOfMemoryError 
-XX:+PrintGCDetails"
-    val EXPECTED_JAVA_OPTS = "-XX\\:+HeapDumpOnOutOfMemoryError 
-XX\\:+PrintGCDetails"
     val submissionClient = new Client(
-      submissionSteps,
-      new SparkConf(false)
-        .set(org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS, 
EXAMPLE_JAVA_OPTS),
+      driverBuilder,
+      kubernetesConf,
       kubernetesClient,
       false,
       "spark",
       loggingPodStatusWatcher,
       KUBERNETES_RESOURCE_PREFIX)
     submissionClient.run()
-    val createdPod = createdPodArgumentCaptor.getValue
     val otherCreatedResources = createdResourcesArgumentCaptor.getAllValues
     assert(otherCreatedResources.size === 2)
-    val secrets = otherCreatedResources.toArray
-      .filter(_.isInstanceOf[Secret]).map(_.asInstanceOf[Secret])
+    val secrets = 
otherCreatedResources.toArray.filter(_.isInstanceOf[Secret]).toSeq
+    assert(secrets === ADDITIONAL_RESOURCES_WITH_OWNER_REFERENCES)
     val configMaps = otherCreatedResources.toArray
       .filter(_.isInstanceOf[ConfigMap]).map(_.asInstanceOf[ConfigMap])
     assert(secrets.nonEmpty)
-    val secret = secrets.head
-    assert(secret.getMetadata.getName === 
FirstTestConfigurationStep.secretName)
-    assert(secret.getData.asScala ===
-      Map(FirstTestConfigurationStep.secretKey -> 
FirstTestConfigurationStep.secretData))
-    val ownerReference = 
Iterables.getOnlyElement(secret.getMetadata.getOwnerReferences)
-    assert(ownerReference.getName === createdPod.getMetadata.getName)
-    assert(ownerReference.getKind === DRIVER_POD_KIND)
-    assert(ownerReference.getUid === DRIVER_POD_UID)
-    assert(ownerReference.getApiVersion === DRIVER_POD_API_VERSION)
     assert(configMaps.nonEmpty)
     val configMap = configMaps.head
     assert(configMap.getMetadata.getName ===
       s"$KUBERNETES_RESOURCE_PREFIX-driver-conf-map")
     assert(configMap.getData.containsKey(SPARK_CONF_FILE_NAME))
-    
assert(configMap.getData.get(SPARK_CONF_FILE_NAME).contains(EXPECTED_JAVA_OPTS))
-    assert(configMap.getData.get(SPARK_CONF_FILE_NAME).contains(
-      "spark.custom-conf=custom-conf-value"))
-    val driverContainer = 
Iterables.getOnlyElement(createdPod.getSpec.getContainers)
-    assert(driverContainer.getName === 
SecondTestConfigurationStep.containerName)
-    val driverEnv = driverContainer.getEnv.asScala.head
-    assert(driverEnv.getName === ENV_SPARK_CONF_DIR)
-    assert(driverEnv.getValue === SPARK_CONF_DIR_INTERNAL)
-    val driverMount = driverContainer.getVolumeMounts.asScala.head
-    assert(driverMount.getName === SPARK_CONF_VOLUME)
-    assert(driverMount.getMountPath === SPARK_CONF_DIR_INTERNAL)
+    
assert(configMap.getData.get(SPARK_CONF_FILE_NAME).contains("conf1key=conf1value"))
+    
assert(configMap.getData.get(SPARK_CONF_FILE_NAME).contains("conf2key=conf2value"))
   }
 
   test("Waiting for app completion should stall on the watcher") {
     val submissionClient = new Client(
-      submissionSteps,
-      new SparkConf(false),
+      driverBuilder,
+      kubernetesConf,
       kubernetesClient,
       true,
       "spark",
@@ -171,56 +207,4 @@ class ClientSuite extends SparkFunSuite with 
BeforeAndAfter {
     submissionClient.run()
     verify(loggingPodStatusWatcher).awaitCompletion()
   }
-
-}
-
-private object FirstTestConfigurationStep extends DriverConfigurationStep {
-
-  val podName = "test-pod"
-  val secretName = "test-secret"
-  val labelKey = "first-submit"
-  val labelValue = "true"
-  val secretKey = "secretKey"
-  val secretData = "secretData"
-
-  override def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec = {
-    val modifiedPod = new PodBuilder(driverSpec.driverPod)
-      .editMetadata()
-      .withName(podName)
-      .addToLabels(labelKey, labelValue)
-      .endMetadata()
-      .build()
-    val additionalResource = new SecretBuilder()
-      .withNewMetadata()
-      .withName(secretName)
-      .endMetadata()
-      .addToData(secretKey, secretData)
-      .build()
-    driverSpec.copy(
-      driverPod = modifiedPod,
-      otherKubernetesResources = driverSpec.otherKubernetesResources ++ 
Seq(additionalResource))
-  }
-}
-
-private object SecondTestConfigurationStep extends DriverConfigurationStep {
-  val annotationKey = "second-submit"
-  val annotationValue = "submitted"
-  val sparkConfKey = "spark.custom-conf"
-  val sparkConfValue = "custom-conf-value"
-  val containerName = "driverContainer"
-  override def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec = {
-    val modifiedPod = new PodBuilder(driverSpec.driverPod)
-      .editMetadata()
-        .addToAnnotations(annotationKey, annotationValue)
-        .endMetadata()
-      .build()
-    val resolvedSparkConf = 
driverSpec.driverSparkConf.clone().set(sparkConfKey, sparkConfValue)
-    val modifiedContainer = new ContainerBuilder(driverSpec.driverContainer)
-      .withName(containerName)
-      .build()
-    driverSpec.copy(
-      driverPod = modifiedPod,
-      driverSparkConf = resolvedSparkConf,
-      driverContainer = modifiedContainer)
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestratorSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestratorSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestratorSuite.scala
deleted file mode 100644
index df34d2d..0000000
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestratorSuite.scala
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit
-
-import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.submit.steps._
-
-class DriverConfigOrchestratorSuite extends SparkFunSuite {
-
-  private val DRIVER_IMAGE = "driver-image"
-  private val IC_IMAGE = "init-container-image"
-  private val APP_ID = "spark-app-id"
-  private val KUBERNETES_RESOURCE_PREFIX = "example-prefix"
-  private val APP_NAME = "spark"
-  private val MAIN_CLASS = "org.apache.spark.examples.SparkPi"
-  private val APP_ARGS = Array("arg1", "arg2")
-  private val SECRET_FOO = "foo"
-  private val SECRET_BAR = "bar"
-  private val SECRET_MOUNT_PATH = "/etc/secrets/driver"
-
-  test("Base submission steps with a main app resource.") {
-    val sparkConf = new SparkConf(false).set(CONTAINER_IMAGE, DRIVER_IMAGE)
-    val mainAppResource = 
JavaMainAppResource("local:///var/apps/jars/main.jar")
-    val orchestrator = new DriverConfigOrchestrator(
-      APP_ID,
-      KUBERNETES_RESOURCE_PREFIX,
-      Some(mainAppResource),
-      APP_NAME,
-      MAIN_CLASS,
-      APP_ARGS,
-      sparkConf)
-    validateStepTypes(
-      orchestrator,
-      classOf[BasicDriverConfigurationStep],
-      classOf[DriverServiceBootstrapStep],
-      classOf[DriverKubernetesCredentialsStep],
-      classOf[DependencyResolutionStep])
-  }
-
-  test("Base submission steps without a main app resource.") {
-    val sparkConf = new SparkConf(false).set(CONTAINER_IMAGE, DRIVER_IMAGE)
-    val orchestrator = new DriverConfigOrchestrator(
-      APP_ID,
-      KUBERNETES_RESOURCE_PREFIX,
-      Option.empty,
-      APP_NAME,
-      MAIN_CLASS,
-      APP_ARGS,
-      sparkConf)
-    validateStepTypes(
-      orchestrator,
-      classOf[BasicDriverConfigurationStep],
-      classOf[DriverServiceBootstrapStep],
-      classOf[DriverKubernetesCredentialsStep])
-  }
-
-  test("Submission steps with driver secrets to mount") {
-    val sparkConf = new SparkConf(false)
-      .set(CONTAINER_IMAGE, DRIVER_IMAGE)
-      .set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$SECRET_FOO", SECRET_MOUNT_PATH)
-      .set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$SECRET_BAR", SECRET_MOUNT_PATH)
-    val mainAppResource = 
JavaMainAppResource("local:///var/apps/jars/main.jar")
-    val orchestrator = new DriverConfigOrchestrator(
-      APP_ID,
-      KUBERNETES_RESOURCE_PREFIX,
-      Some(mainAppResource),
-      APP_NAME,
-      MAIN_CLASS,
-      APP_ARGS,
-      sparkConf)
-    validateStepTypes(
-      orchestrator,
-      classOf[BasicDriverConfigurationStep],
-      classOf[DriverServiceBootstrapStep],
-      classOf[DriverKubernetesCredentialsStep],
-      classOf[DependencyResolutionStep],
-      classOf[DriverMountSecretsStep])
-  }
-
-  test("Submission using client local dependencies") {
-    val sparkConf = new SparkConf(false)
-      .set(CONTAINER_IMAGE, DRIVER_IMAGE)
-    var orchestrator = new DriverConfigOrchestrator(
-      APP_ID,
-      KUBERNETES_RESOURCE_PREFIX,
-      Some(JavaMainAppResource("file:///var/apps/jars/main.jar")),
-      APP_NAME,
-      MAIN_CLASS,
-      APP_ARGS,
-      sparkConf)
-    assertThrows[SparkException] {
-      orchestrator.getAllConfigurationSteps
-    }
-
-    sparkConf.set("spark.files", "/path/to/file1,/path/to/file2")
-    orchestrator = new DriverConfigOrchestrator(
-      APP_ID,
-      KUBERNETES_RESOURCE_PREFIX,
-      Some(JavaMainAppResource("local:///var/apps/jars/main.jar")),
-      APP_NAME,
-      MAIN_CLASS,
-      APP_ARGS,
-      sparkConf)
-    assertThrows[SparkException] {
-      orchestrator.getAllConfigurationSteps
-    }
-  }
-
-  private def validateStepTypes(
-      orchestrator: DriverConfigOrchestrator,
-      types: Class[_ <: DriverConfigurationStep]*): Unit = {
-    val steps = orchestrator.getAllConfigurationSteps
-    assert(steps.size === types.size)
-    assert(steps.map(_.getClass) === types)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
new file mode 100644
index 0000000..161f9af
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, 
KubernetesDriverSpecificConf}
+import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, 
DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, 
KubernetesFeaturesTestUtils, MountSecretsFeatureStep}
+
+class KubernetesDriverBuilderSuite extends SparkFunSuite {
+
+  private val BASIC_STEP_TYPE = "basic"
+  private val CREDENTIALS_STEP_TYPE = "credentials"
+  private val SERVICE_STEP_TYPE = "service"
+  private val SECRETS_STEP_TYPE = "mount-secrets"
+
+  private val basicFeatureStep = 
KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
+    BASIC_STEP_TYPE, classOf[BasicDriverFeatureStep])
+
+  private val credentialsStep = 
KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
+    CREDENTIALS_STEP_TYPE, classOf[DriverKubernetesCredentialsFeatureStep])
+
+  private val serviceStep = 
KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
+    SERVICE_STEP_TYPE, classOf[DriverServiceFeatureStep])
+
+  private val secretsStep = 
KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
+    SECRETS_STEP_TYPE, classOf[MountSecretsFeatureStep])
+
+  private val builderUnderTest: KubernetesDriverBuilder =
+    new KubernetesDriverBuilder(
+      _ => basicFeatureStep,
+      _ => credentialsStep,
+      _ => serviceStep,
+      _ => secretsStep)
+
+  test("Apply fundamental steps all the time.") {
+    val conf = KubernetesConf(
+      new SparkConf(false),
+      KubernetesDriverSpecificConf(
+        None,
+        "test-app",
+        "main",
+        Seq.empty),
+      "prefix",
+      "appId",
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Map.empty)
+    validateStepTypesApplied(
+      builderUnderTest.buildFromFeatures(conf),
+      BASIC_STEP_TYPE,
+      CREDENTIALS_STEP_TYPE,
+      SERVICE_STEP_TYPE)
+  }
+
+  test("Apply secrets step if secrets are present.") {
+    val conf = KubernetesConf(
+      new SparkConf(false),
+      KubernetesDriverSpecificConf(
+        None,
+        "test-app",
+        "main",
+        Seq.empty),
+      "prefix",
+      "appId",
+      Map.empty,
+      Map.empty,
+      Map("secret" -> "secretMountPath"),
+      Map.empty)
+    validateStepTypesApplied(
+      builderUnderTest.buildFromFeatures(conf),
+      BASIC_STEP_TYPE,
+      CREDENTIALS_STEP_TYPE,
+      SERVICE_STEP_TYPE,
+      SECRETS_STEP_TYPE)
+  }
+
+  private def validateStepTypesApplied(resolvedSpec: KubernetesDriverSpec, 
stepTypes: String*)
+    : Unit = {
+    assert(resolvedSpec.systemProperties.size === stepTypes.size)
+    stepTypes.foreach { stepType =>
+      assert(resolvedSpec.pod.pod.getMetadata.getLabels.get(stepType) === 
stepType)
+      assert(resolvedSpec.driverKubernetesResources.containsSlice(
+        KubernetesFeaturesTestUtils.getSecretsForStepType(stepType)))
+      assert(resolvedSpec.systemProperties(stepType) === stepType)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala
deleted file mode 100644
index ee450ff..0000000
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps
-
-import scala.collection.JavaConverters._
-
-import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, 
PodBuilder}
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
-
-class BasicDriverConfigurationStepSuite extends SparkFunSuite {
-
-  private val APP_ID = "spark-app-id"
-  private val RESOURCE_NAME_PREFIX = "spark"
-  private val DRIVER_LABELS = Map("labelkey" -> "labelvalue")
-  private val CONTAINER_IMAGE_PULL_POLICY = "IfNotPresent"
-  private val APP_NAME = "spark-test"
-  private val MAIN_CLASS = "org.apache.spark.examples.SparkPi"
-  private val APP_ARGS = Array("arg1", "arg2", "\"arg 3\"")
-  private val CUSTOM_ANNOTATION_KEY = "customAnnotation"
-  private val CUSTOM_ANNOTATION_VALUE = "customAnnotationValue"
-  private val DRIVER_CUSTOM_ENV_KEY1 = "customDriverEnv1"
-  private val DRIVER_CUSTOM_ENV_KEY2 = "customDriverEnv2"
-
-  test("Set all possible configurations from the user.") {
-    val sparkConf = new SparkConf()
-      .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
-      .set(org.apache.spark.internal.config.DRIVER_CLASS_PATH, 
"/opt/spark/spark-examples.jar")
-      .set("spark.driver.cores", "2")
-      .set(KUBERNETES_DRIVER_LIMIT_CORES, "4")
-      .set(org.apache.spark.internal.config.DRIVER_MEMORY.key, "256M")
-      .set(org.apache.spark.internal.config.DRIVER_MEMORY_OVERHEAD, 200L)
-      .set(CONTAINER_IMAGE, "spark-driver:latest")
-      .set(s"$KUBERNETES_DRIVER_ANNOTATION_PREFIX$CUSTOM_ANNOTATION_KEY", 
CUSTOM_ANNOTATION_VALUE)
-      .set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY1", 
"customDriverEnv1")
-      .set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY2", 
"customDriverEnv2")
-      .set(IMAGE_PULL_SECRETS, "imagePullSecret1, imagePullSecret2")
-
-    val submissionStep = new BasicDriverConfigurationStep(
-      APP_ID,
-      RESOURCE_NAME_PREFIX,
-      DRIVER_LABELS,
-      CONTAINER_IMAGE_PULL_POLICY,
-      APP_NAME,
-      MAIN_CLASS,
-      APP_ARGS,
-      sparkConf)
-    val basePod = new 
PodBuilder().withNewMetadata().endMetadata().withNewSpec().endSpec().build()
-    val baseDriverSpec = KubernetesDriverSpec(
-      driverPod = basePod,
-      driverContainer = new ContainerBuilder().build(),
-      driverSparkConf = new SparkConf(false),
-      otherKubernetesResources = Seq.empty[HasMetadata])
-    val preparedDriverSpec = submissionStep.configureDriver(baseDriverSpec)
-
-    assert(preparedDriverSpec.driverContainer.getName === 
DRIVER_CONTAINER_NAME)
-    assert(preparedDriverSpec.driverContainer.getImage === 
"spark-driver:latest")
-    assert(preparedDriverSpec.driverContainer.getImagePullPolicy === 
CONTAINER_IMAGE_PULL_POLICY)
-
-    assert(preparedDriverSpec.driverContainer.getEnv.size === 4)
-    val envs = preparedDriverSpec.driverContainer
-      .getEnv
-      .asScala
-      .map(env => (env.getName, env.getValue))
-      .toMap
-    assert(envs(ENV_CLASSPATH) === "/opt/spark/spark-examples.jar")
-    assert(envs(DRIVER_CUSTOM_ENV_KEY1) === "customDriverEnv1")
-    assert(envs(DRIVER_CUSTOM_ENV_KEY2) === "customDriverEnv2")
-
-    assert(preparedDriverSpec.driverContainer.getEnv.asScala.exists(envVar =>
-      envVar.getName.equals(ENV_DRIVER_BIND_ADDRESS) &&
-        envVar.getValueFrom.getFieldRef.getApiVersion.equals("v1") &&
-        envVar.getValueFrom.getFieldRef.getFieldPath.equals("status.podIP")))
-
-    val resourceRequirements = preparedDriverSpec.driverContainer.getResources
-    val requests = resourceRequirements.getRequests.asScala
-    assert(requests("cpu").getAmount === "2")
-    assert(requests("memory").getAmount === "456Mi")
-    val limits = resourceRequirements.getLimits.asScala
-    assert(limits("memory").getAmount === "456Mi")
-    assert(limits("cpu").getAmount === "4")
-
-    val driverPodMetadata = preparedDriverSpec.driverPod.getMetadata
-    assert(driverPodMetadata.getName === "spark-driver-pod")
-    assert(driverPodMetadata.getLabels.asScala === DRIVER_LABELS)
-    val expectedAnnotations = Map(
-      CUSTOM_ANNOTATION_KEY -> CUSTOM_ANNOTATION_VALUE,
-      SPARK_APP_NAME_ANNOTATION -> APP_NAME)
-    assert(driverPodMetadata.getAnnotations.asScala === expectedAnnotations)
-
-    val driverPodSpec = preparedDriverSpec.driverPod.getSpec
-    assert(driverPodSpec.getRestartPolicy === "Never")
-    assert(driverPodSpec.getImagePullSecrets.size() === 2)
-    assert(driverPodSpec.getImagePullSecrets.get(0).getName === 
"imagePullSecret1")
-    assert(driverPodSpec.getImagePullSecrets.get(1).getName === 
"imagePullSecret2")
-
-    val resolvedSparkConf = preparedDriverSpec.driverSparkConf.getAll.toMap
-    val expectedSparkConf = Map(
-      KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod",
-      "spark.app.id" -> APP_ID,
-      KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> RESOURCE_NAME_PREFIX,
-      "spark.kubernetes.submitInDriver" -> "true")
-    assert(resolvedSparkConf === expectedSparkConf)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStepSuite.scala
deleted file mode 100644
index ca43fc9..0000000
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStepSuite.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps
-
-import java.io.File
-
-import scala.collection.JavaConverters._
-
-import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, 
PodBuilder}
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
-
-class DependencyResolutionStepSuite extends SparkFunSuite {
-
-  private val SPARK_JARS = Seq(
-    "apps/jars/jar1.jar",
-    "local:///var/apps/jars/jar2.jar")
-
-  private val SPARK_FILES = Seq(
-    "apps/files/file1.txt",
-    "local:///var/apps/files/file2.txt")
-
-  test("Added dependencies should be resolved in Spark configuration and 
environment") {
-    val dependencyResolutionStep = new DependencyResolutionStep(
-      SPARK_JARS,
-      SPARK_FILES)
-    val driverPod = new PodBuilder().build()
-    val baseDriverSpec = KubernetesDriverSpec(
-      driverPod = driverPod,
-      driverContainer = new ContainerBuilder().build(),
-      driverSparkConf = new SparkConf(false),
-      otherKubernetesResources = Seq.empty[HasMetadata])
-    val preparedDriverSpec = 
dependencyResolutionStep.configureDriver(baseDriverSpec)
-    assert(preparedDriverSpec.driverPod === driverPod)
-    assert(preparedDriverSpec.otherKubernetesResources.isEmpty)
-    val resolvedSparkJars = 
preparedDriverSpec.driverSparkConf.get("spark.jars").split(",").toSet
-    val expectedResolvedSparkJars = Set(
-      "apps/jars/jar1.jar",
-      "/var/apps/jars/jar2.jar")
-    assert(resolvedSparkJars === expectedResolvedSparkJars)
-    val resolvedSparkFiles = 
preparedDriverSpec.driverSparkConf.get("spark.files").split(",").toSet
-    val expectedResolvedSparkFiles = Set(
-      "apps/files/file1.txt",
-      "/var/apps/files/file2.txt")
-    assert(resolvedSparkFiles === expectedResolvedSparkFiles)
-    val driverEnv = preparedDriverSpec.driverContainer.getEnv.asScala
-    assert(driverEnv.size === 1)
-    assert(driverEnv.head.getName === ENV_MOUNTED_CLASSPATH)
-    val resolvedDriverClasspath = 
driverEnv.head.getValue.split(File.pathSeparator).toSet
-    val expectedResolvedDriverClasspath = expectedResolvedSparkJars
-    assert(resolvedDriverClasspath === expectedResolvedDriverClasspath)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStepSuite.scala
deleted file mode 100644
index 64553d2..0000000
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStepSuite.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps
-
-import java.io.File
-
-import scala.collection.JavaConverters._
-
-import com.google.common.base.Charsets
-import com.google.common.io.{BaseEncoding, Files}
-import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, 
PodBuilder, Secret}
-import org.scalatest.BeforeAndAfter
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
-import org.apache.spark.util.Utils
-
-class DriverKubernetesCredentialsStepSuite extends SparkFunSuite with 
BeforeAndAfter {
-
-  private val KUBERNETES_RESOURCE_NAME_PREFIX = "spark"
-  private var credentialsTempDirectory: File = _
-  private val BASE_DRIVER_SPEC = new KubernetesDriverSpec(
-    driverPod = new PodBuilder().build(),
-    driverContainer = new ContainerBuilder().build(),
-    driverSparkConf = new SparkConf(false),
-    otherKubernetesResources = Seq.empty[HasMetadata])
-
-  before {
-    credentialsTempDirectory = Utils.createTempDir()
-  }
-
-  after {
-    credentialsTempDirectory.delete()
-  }
-
-  test("Don't set any credentials") {
-    val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep(
-        new SparkConf(false), KUBERNETES_RESOURCE_NAME_PREFIX)
-    val preparedDriverSpec = 
kubernetesCredentialsStep.configureDriver(BASE_DRIVER_SPEC)
-    assert(preparedDriverSpec.driverPod === BASE_DRIVER_SPEC.driverPod)
-    assert(preparedDriverSpec.driverContainer === 
BASE_DRIVER_SPEC.driverContainer)
-    assert(preparedDriverSpec.otherKubernetesResources.isEmpty)
-    assert(preparedDriverSpec.driverSparkConf.getAll.isEmpty)
-  }
-
-  test("Only set credentials that are manually mounted.") {
-    val submissionSparkConf = new SparkConf(false)
-      .set(
-        
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX",
-        "/mnt/secrets/my-token.txt")
-      .set(
-        
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
-        "/mnt/secrets/my-key.pem")
-      .set(
-        
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX",
-        "/mnt/secrets/my-cert.pem")
-      .set(
-        
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
-        "/mnt/secrets/my-ca.pem")
-
-    val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep(
-      submissionSparkConf, KUBERNETES_RESOURCE_NAME_PREFIX)
-    val preparedDriverSpec = 
kubernetesCredentialsStep.configureDriver(BASE_DRIVER_SPEC)
-    assert(preparedDriverSpec.driverPod === BASE_DRIVER_SPEC.driverPod)
-    assert(preparedDriverSpec.driverContainer === 
BASE_DRIVER_SPEC.driverContainer)
-    assert(preparedDriverSpec.otherKubernetesResources.isEmpty)
-    assert(preparedDriverSpec.driverSparkConf.getAll.toMap === 
submissionSparkConf.getAll.toMap)
-  }
-
-  test("Mount credentials from the submission client as a secret.") {
-    val caCertFile = writeCredentials("ca.pem", "ca-cert")
-    val clientKeyFile = writeCredentials("key.pem", "key")
-    val clientCertFile = writeCredentials("cert.pem", "cert")
-    val submissionSparkConf = new SparkConf(false)
-      .set(
-        s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX",
-        "token")
-      .set(
-        s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
-        clientKeyFile.getAbsolutePath)
-      .set(
-        s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX",
-        clientCertFile.getAbsolutePath)
-      .set(
-        s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
-        caCertFile.getAbsolutePath)
-    val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep(
-      submissionSparkConf, KUBERNETES_RESOURCE_NAME_PREFIX)
-    val preparedDriverSpec = kubernetesCredentialsStep.configureDriver(
-      BASE_DRIVER_SPEC.copy(driverSparkConf = submissionSparkConf))
-    val expectedSparkConf = Map(
-      s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX" -> 
"<present_but_redacted>",
-      
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX" ->
-        DRIVER_CREDENTIALS_OAUTH_TOKEN_PATH,
-      
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX" ->
-        DRIVER_CREDENTIALS_CLIENT_KEY_PATH,
-      
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX" ->
-        DRIVER_CREDENTIALS_CLIENT_CERT_PATH,
-      s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX" 
->
-        DRIVER_CREDENTIALS_CA_CERT_PATH,
-      s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX" ->
-        clientKeyFile.getAbsolutePath,
-      s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX" ->
-        clientCertFile.getAbsolutePath,
-      s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX" ->
-        caCertFile.getAbsolutePath)
-    assert(preparedDriverSpec.driverSparkConf.getAll.toMap === 
expectedSparkConf)
-    assert(preparedDriverSpec.otherKubernetesResources.size === 1)
-    val credentialsSecret = 
preparedDriverSpec.otherKubernetesResources.head.asInstanceOf[Secret]
-    assert(credentialsSecret.getMetadata.getName ===
-      s"$KUBERNETES_RESOURCE_NAME_PREFIX-kubernetes-credentials")
-    val decodedSecretData = credentialsSecret.getData.asScala.map { data =>
-      (data._1, new String(BaseEncoding.base64().decode(data._2), 
Charsets.UTF_8))
-    }
-    val expectedSecretData = Map(
-      DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME -> "ca-cert",
-      DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME -> "token",
-      DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME -> "key",
-      DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME -> "cert")
-    assert(decodedSecretData === expectedSecretData)
-    val driverPodVolumes = 
preparedDriverSpec.driverPod.getSpec.getVolumes.asScala
-    assert(driverPodVolumes.size === 1)
-    assert(driverPodVolumes.head.getName === 
DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
-    assert(driverPodVolumes.head.getSecret != null)
-    assert(driverPodVolumes.head.getSecret.getSecretName === 
credentialsSecret.getMetadata.getName)
-    val driverContainerVolumeMount = 
preparedDriverSpec.driverContainer.getVolumeMounts.asScala
-    assert(driverContainerVolumeMount.size === 1)
-    assert(driverContainerVolumeMount.head.getName === 
DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
-    assert(driverContainerVolumeMount.head.getMountPath === 
DRIVER_CREDENTIALS_SECRETS_BASE_DIR)
-  }
-
-  private def writeCredentials(credentialsFileName: String, 
credentialsContents: String): File = {
-    val credentialsFile = new File(credentialsTempDirectory, 
credentialsFileName)
-    Files.write(credentialsContents, credentialsFile, Charsets.UTF_8)
-    credentialsFile
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverMountSecretsStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverMountSecretsStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverMountSecretsStepSuite.scala
deleted file mode 100644
index 960d0bd..0000000
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverMountSecretsStepSuite.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.{MountSecretsBootstrap, SecretVolumeUtils}
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
-
-class DriverMountSecretsStepSuite extends SparkFunSuite {
-
-  private val SECRET_FOO = "foo"
-  private val SECRET_BAR = "bar"
-  private val SECRET_MOUNT_PATH = "/etc/secrets/driver"
-
-  test("mounts all given secrets") {
-    val baseDriverSpec = KubernetesDriverSpec.initialSpec(new SparkConf(false))
-    val secretNamesToMountPaths = Map(
-      SECRET_FOO -> SECRET_MOUNT_PATH,
-      SECRET_BAR -> SECRET_MOUNT_PATH)
-
-    val mountSecretsBootstrap = new 
MountSecretsBootstrap(secretNamesToMountPaths)
-    val mountSecretsStep = new DriverMountSecretsStep(mountSecretsBootstrap)
-    val configuredDriverSpec = mountSecretsStep.configureDriver(baseDriverSpec)
-    val driverPodWithSecretsMounted = configuredDriverSpec.driverPod
-    val driverContainerWithSecretsMounted = 
configuredDriverSpec.driverContainer
-
-    Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach { volumeName =>
-      assert(SecretVolumeUtils.podHasVolume(driverPodWithSecretsMounted, 
volumeName))
-    }
-    Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach { volumeName =>
-      assert(SecretVolumeUtils.containerHasVolume(
-        driverContainerWithSecretsMounted, volumeName, SECRET_MOUNT_PATH))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStepSuite.scala
deleted file mode 100644
index 78c8c3b..0000000
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStepSuite.scala
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps
-
-import scala.collection.JavaConverters._
-
-import io.fabric8.kubernetes.api.model.Service
-import org.mockito.{Mock, MockitoAnnotations}
-import org.mockito.Mockito.when
-import org.scalatest.BeforeAndAfter
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
-import org.apache.spark.util.Clock
-
-class DriverServiceBootstrapStepSuite extends SparkFunSuite with 
BeforeAndAfter {
-
-  private val SHORT_RESOURCE_NAME_PREFIX =
-    "a" * (DriverServiceBootstrapStep.MAX_SERVICE_NAME_LENGTH -
-      DriverServiceBootstrapStep.DRIVER_SVC_POSTFIX.length)
-
-  private val LONG_RESOURCE_NAME_PREFIX =
-    "a" * (DriverServiceBootstrapStep.MAX_SERVICE_NAME_LENGTH -
-      DriverServiceBootstrapStep.DRIVER_SVC_POSTFIX.length + 1)
-  private val DRIVER_LABELS = Map(
-    "label1key" -> "label1value",
-    "label2key" -> "label2value")
-
-  @Mock
-  private var clock: Clock = _
-
-  private var sparkConf: SparkConf = _
-
-  before {
-    MockitoAnnotations.initMocks(this)
-    sparkConf = new SparkConf(false)
-  }
-
-  test("Headless service has a port for the driver RPC and the block 
manager.") {
-    val configurationStep = new DriverServiceBootstrapStep(
-      SHORT_RESOURCE_NAME_PREFIX,
-      DRIVER_LABELS,
-      sparkConf
-        .set("spark.driver.port", "9000")
-        .set(org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, 8080),
-      clock)
-    val baseDriverSpec = KubernetesDriverSpec.initialSpec(sparkConf.clone())
-    val resolvedDriverSpec = configurationStep.configureDriver(baseDriverSpec)
-    assert(resolvedDriverSpec.otherKubernetesResources.size === 1)
-    
assert(resolvedDriverSpec.otherKubernetesResources.head.isInstanceOf[Service])
-    val driverService = 
resolvedDriverSpec.otherKubernetesResources.head.asInstanceOf[Service]
-    verifyService(
-      9000,
-      8080,
-      
s"$SHORT_RESOURCE_NAME_PREFIX${DriverServiceBootstrapStep.DRIVER_SVC_POSTFIX}",
-      driverService)
-  }
-
-  test("Hostname and ports are set according to the service name.") {
-    val configurationStep = new DriverServiceBootstrapStep(
-      SHORT_RESOURCE_NAME_PREFIX,
-      DRIVER_LABELS,
-      sparkConf
-        .set("spark.driver.port", "9000")
-        .set(org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, 8080)
-        .set(KUBERNETES_NAMESPACE, "my-namespace"),
-      clock)
-    val baseDriverSpec = KubernetesDriverSpec.initialSpec(sparkConf.clone())
-    val resolvedDriverSpec = configurationStep.configureDriver(baseDriverSpec)
-    val expectedServiceName = SHORT_RESOURCE_NAME_PREFIX +
-      DriverServiceBootstrapStep.DRIVER_SVC_POSTFIX
-    val expectedHostName = s"$expectedServiceName.my-namespace.svc"
-    verifySparkConfHostNames(resolvedDriverSpec.driverSparkConf, 
expectedHostName)
-  }
-
-  test("Ports should resolve to defaults in SparkConf and in the service.") {
-    val configurationStep = new DriverServiceBootstrapStep(
-      SHORT_RESOURCE_NAME_PREFIX,
-      DRIVER_LABELS,
-      sparkConf,
-      clock)
-    val baseDriverSpec = KubernetesDriverSpec.initialSpec(sparkConf.clone())
-    val resolvedDriverSpec = configurationStep.configureDriver(baseDriverSpec)
-    verifyService(
-      DEFAULT_DRIVER_PORT,
-      DEFAULT_BLOCKMANAGER_PORT,
-      
s"$SHORT_RESOURCE_NAME_PREFIX${DriverServiceBootstrapStep.DRIVER_SVC_POSTFIX}",
-      resolvedDriverSpec.otherKubernetesResources.head.asInstanceOf[Service])
-    assert(resolvedDriverSpec.driverSparkConf.get("spark.driver.port") ===
-      DEFAULT_DRIVER_PORT.toString)
-    assert(resolvedDriverSpec.driverSparkConf.get(
-      org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT) === 
DEFAULT_BLOCKMANAGER_PORT)
-  }
-
-  test("Long prefixes should switch to using a generated name.") {
-    val configurationStep = new DriverServiceBootstrapStep(
-      LONG_RESOURCE_NAME_PREFIX,
-      DRIVER_LABELS,
-      sparkConf.set(KUBERNETES_NAMESPACE, "my-namespace"),
-      clock)
-    when(clock.getTimeMillis()).thenReturn(10000)
-    val baseDriverSpec = KubernetesDriverSpec.initialSpec(sparkConf.clone())
-    val resolvedDriverSpec = configurationStep.configureDriver(baseDriverSpec)
-    val driverService = 
resolvedDriverSpec.otherKubernetesResources.head.asInstanceOf[Service]
-    val expectedServiceName = 
s"spark-10000${DriverServiceBootstrapStep.DRIVER_SVC_POSTFIX}"
-    assert(driverService.getMetadata.getName === expectedServiceName)
-    val expectedHostName = s"$expectedServiceName.my-namespace.svc"
-    verifySparkConfHostNames(resolvedDriverSpec.driverSparkConf, 
expectedHostName)
-  }
-
-  test("Disallow bind address and driver host to be set explicitly.") {
-    val configurationStep = new DriverServiceBootstrapStep(
-      LONG_RESOURCE_NAME_PREFIX,
-      DRIVER_LABELS,
-      sparkConf.set(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS, 
"host"),
-      clock)
-    try {
-      
configurationStep.configureDriver(KubernetesDriverSpec.initialSpec(sparkConf))
-      fail("The driver bind address should not be allowed.")
-    } catch {
-      case e: Throwable =>
-        assert(e.getMessage ===
-          s"requirement failed: 
${DriverServiceBootstrapStep.DRIVER_BIND_ADDRESS_KEY} is" +
-          " not supported in Kubernetes mode, as the driver's bind address is 
managed" +
-          " and set to the driver pod's IP address.")
-    }
-    sparkConf.remove(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS)
-    sparkConf.set(org.apache.spark.internal.config.DRIVER_HOST_ADDRESS, "host")
-    try {
-      
configurationStep.configureDriver(KubernetesDriverSpec.initialSpec(sparkConf))
-      fail("The driver host address should not be allowed.")
-    } catch {
-      case e: Throwable =>
-        assert(e.getMessage ===
-          s"requirement failed: ${DriverServiceBootstrapStep.DRIVER_HOST_KEY} 
is" +
-          " not supported in Kubernetes mode, as the driver's hostname will be 
managed via" +
-          " a Kubernetes service.")
-    }
-  }
-
-  private def verifyService(
-      driverPort: Int,
-      blockManagerPort: Int,
-      expectedServiceName: String,
-      service: Service): Unit = {
-    assert(service.getMetadata.getName === expectedServiceName)
-    assert(service.getSpec.getClusterIP === "None")
-    assert(service.getSpec.getSelector.asScala === DRIVER_LABELS)
-    assert(service.getSpec.getPorts.size() === 2)
-    val driverServicePorts = service.getSpec.getPorts.asScala
-    assert(driverServicePorts.head.getName === DRIVER_PORT_NAME)
-    assert(driverServicePorts.head.getPort.intValue() === driverPort)
-    assert(driverServicePorts.head.getTargetPort.getIntVal === driverPort)
-    assert(driverServicePorts(1).getName === BLOCK_MANAGER_PORT_NAME)
-    assert(driverServicePorts(1).getPort.intValue() === blockManagerPort)
-    assert(driverServicePorts(1).getTargetPort.getIntVal === blockManagerPort)
-  }
-
-  private def verifySparkConfHostNames(
-      driverSparkConf: SparkConf, expectedHostName: String): Unit = {
-    assert(driverSparkConf.get(
-      org.apache.spark.internal.config.DRIVER_HOST_ADDRESS) === 
expectedHostName)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
deleted file mode 100644
index d73df20..0000000
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.scheduler.cluster.k8s
-
-import scala.collection.JavaConverters._
-
-import io.fabric8.kubernetes.api.model._
-import org.mockito.MockitoAnnotations
-import org.scalatest.{BeforeAndAfter, BeforeAndAfterEach}
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.MountSecretsBootstrap
-
-class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with 
BeforeAndAfterEach {
-
-  private val driverPodName: String = "driver-pod"
-  private val driverPodUid: String = "driver-uid"
-  private val executorPrefix: String = "base"
-  private val executorImage: String = "executor-image"
-  private val imagePullSecrets: String = "imagePullSecret1, imagePullSecret2"
-  private val driverPod = new PodBuilder()
-    .withNewMetadata()
-    .withName(driverPodName)
-    .withUid(driverPodUid)
-    .endMetadata()
-    .withNewSpec()
-    .withNodeName("some-node")
-    .endSpec()
-    .withNewStatus()
-    .withHostIP("192.168.99.100")
-    .endStatus()
-    .build()
-  private var baseConf: SparkConf = _
-
-  before {
-    MockitoAnnotations.initMocks(this)
-    baseConf = new SparkConf()
-      .set(KUBERNETES_DRIVER_POD_NAME, driverPodName)
-      .set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, executorPrefix)
-      .set(CONTAINER_IMAGE, executorImage)
-      .set(KUBERNETES_DRIVER_SUBMIT_CHECK, true)
-      .set(IMAGE_PULL_SECRETS, imagePullSecrets)
-  }
-
-  test("basic executor pod has reasonable defaults") {
-    val factory = new ExecutorPodFactory(baseConf, None)
-    val executor = factory.createExecutorPod(
-      "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, 
Int]())
-
-    // The executor pod name and default labels.
-    assert(executor.getMetadata.getName === s"$executorPrefix-exec-1")
-    assert(executor.getMetadata.getLabels.size() === 3)
-    assert(executor.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL) === "1")
-
-    // There is exactly 1 container with no volume mounts and default memory 
limits and requests.
-    // Default memory limit/request is 1024M + 384M (minimum overhead 
constant).
-    assert(executor.getSpec.getContainers.size() === 1)
-    assert(executor.getSpec.getContainers.get(0).getImage === executorImage)
-    assert(executor.getSpec.getContainers.get(0).getVolumeMounts.isEmpty)
-    assert(executor.getSpec.getContainers.get(0).getResources.getLimits.size() 
=== 1)
-    assert(executor.getSpec.getContainers.get(0).getResources
-      .getRequests.get("memory").getAmount === "1408Mi")
-    assert(executor.getSpec.getContainers.get(0).getResources
-      .getLimits.get("memory").getAmount === "1408Mi")
-    assert(executor.getSpec.getImagePullSecrets.size() === 2)
-    assert(executor.getSpec.getImagePullSecrets.get(0).getName === 
"imagePullSecret1")
-    assert(executor.getSpec.getImagePullSecrets.get(1).getName === 
"imagePullSecret2")
-
-    // The pod has no node selector, volumes.
-    assert(executor.getSpec.getNodeSelector.isEmpty)
-    assert(executor.getSpec.getVolumes.isEmpty)
-
-    checkEnv(executor, Map())
-    checkOwnerReferences(executor, driverPodUid)
-  }
-
-  test("executor core request specification") {
-    var factory = new ExecutorPodFactory(baseConf, None)
-    var executor = factory.createExecutorPod(
-      "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, 
Int]())
-    assert(executor.getSpec.getContainers.size() === 1)
-    
assert(executor.getSpec.getContainers.get(0).getResources.getRequests.get("cpu").getAmount
-      === "1")
-
-    val conf = baseConf.clone()
-
-    conf.set(KUBERNETES_EXECUTOR_REQUEST_CORES, "0.1")
-    factory = new ExecutorPodFactory(conf, None)
-    executor = factory.createExecutorPod(
-      "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, 
Int]())
-    assert(executor.getSpec.getContainers.size() === 1)
-    
assert(executor.getSpec.getContainers.get(0).getResources.getRequests.get("cpu").getAmount
-      === "0.1")
-
-    conf.set(KUBERNETES_EXECUTOR_REQUEST_CORES, "100m")
-    factory = new ExecutorPodFactory(conf, None)
-    conf.set(KUBERNETES_EXECUTOR_REQUEST_CORES, "100m")
-    executor = factory.createExecutorPod(
-      "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, 
Int]())
-    
assert(executor.getSpec.getContainers.get(0).getResources.getRequests.get("cpu").getAmount
-      === "100m")
-  }
-
-  test("executor pod hostnames get truncated to 63 characters") {
-    val conf = baseConf.clone()
-    conf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX,
-      "loremipsumdolorsitametvimatelitrefficiendisuscipianturvixlegeresple")
-
-    val factory = new ExecutorPodFactory(conf, None)
-    val executor = factory.createExecutorPod(
-      "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, 
Int]())
-
-    assert(executor.getSpec.getHostname.length === 63)
-  }
-
-  test("classpath and extra java options get translated into environment 
variables") {
-    val conf = baseConf.clone()
-    conf.set(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS, "foo=bar")
-    conf.set(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH, "bar=baz")
-
-    val factory = new ExecutorPodFactory(conf, None)
-    val executor = factory.createExecutorPod(
-      "1", "dummy", "dummy", Seq[(String, String)]("qux" -> "quux"), 
driverPod, Map[String, Int]())
-
-    checkEnv(executor,
-      Map("SPARK_JAVA_OPT_0" -> "foo=bar",
-        ENV_CLASSPATH -> "bar=baz",
-        "qux" -> "quux"))
-    checkOwnerReferences(executor, driverPodUid)
-  }
-
-  test("executor secrets get mounted") {
-    val conf = baseConf.clone()
-
-    val secretsBootstrap = new MountSecretsBootstrap(Map("secret1" -> 
"/var/secret1"))
-    val factory = new ExecutorPodFactory(conf, Some(secretsBootstrap))
-    val executor = factory.createExecutorPod(
-      "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, 
Int]())
-
-    assert(executor.getSpec.getContainers.size() === 1)
-    assert(executor.getSpec.getContainers.get(0).getVolumeMounts.size() === 1)
-    assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0).getName
-      === "secret1-volume")
-    assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0)
-      .getMountPath === "/var/secret1")
-
-    // check volume mounted.
-    assert(executor.getSpec.getVolumes.size() === 1)
-    assert(executor.getSpec.getVolumes.get(0).getSecret.getSecretName === 
"secret1")
-
-    checkOwnerReferences(executor, driverPodUid)
-  }
-
-  // There is always exactly one controller reference, and it points to the 
driver pod.
-  private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit 
= {
-    assert(executor.getMetadata.getOwnerReferences.size() === 1)
-    assert(executor.getMetadata.getOwnerReferences.get(0).getUid === 
driverPodUid)
-    assert(executor.getMetadata.getOwnerReferences.get(0).getController === 
true)
-  }
-
-  // Check that the expected environment variables are present.
-  private def checkEnv(executor: Pod, additionalEnvVars: Map[String, String]): 
Unit = {
-    val defaultEnvs = Map(
-      ENV_EXECUTOR_ID -> "1",
-      ENV_DRIVER_URL -> "dummy",
-      ENV_EXECUTOR_CORES -> "1",
-      ENV_EXECUTOR_MEMORY -> "1g",
-      ENV_APPLICATION_ID -> "dummy",
-      ENV_SPARK_CONF_DIR -> SPARK_CONF_DIR_INTERNAL,
-      ENV_EXECUTOR_POD_IP -> null) ++ additionalEnvVars
-
-    assert(executor.getSpec.getContainers.size() === 1)
-    assert(executor.getSpec.getContainers.get(0).getEnv.size() === 
defaultEnvs.size)
-    val mapEnvs = executor.getSpec.getContainers.get(0).getEnv.asScala.map {
-      x => (x.getName, x.getValue)
-    }.toMap
-    assert(defaultEnvs === mapEnvs)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
index b2f26f2..96065e8 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
@@ -18,11 +18,12 @@ package org.apache.spark.scheduler.cluster.k8s
 
 import java.util.concurrent.{ExecutorService, ScheduledExecutorService, 
TimeUnit}
 
-import io.fabric8.kubernetes.api.model.{DoneablePod, Pod, PodBuilder, PodList}
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, DoneablePod, Pod, 
PodBuilder, PodList}
 import io.fabric8.kubernetes.client.{KubernetesClient, Watch, Watcher}
 import io.fabric8.kubernetes.client.Watcher.Action
 import io.fabric8.kubernetes.client.dsl.{FilterWatchListDeletable, 
MixedOperation, NonNamespaceOperation, PodResource}
-import org.mockito.{AdditionalAnswers, ArgumentCaptor, Mock, 
MockitoAnnotations}
+import org.hamcrest.{BaseMatcher, Description, Matcher}
+import org.mockito.{AdditionalAnswers, ArgumentCaptor, Matchers, Mock, 
MockitoAnnotations}
 import org.mockito.Matchers.{any, eq => mockitoEq}
 import org.mockito.Mockito.{doNothing, never, times, verify, when}
 import org.scalatest.BeforeAndAfter
@@ -31,6 +32,7 @@ import scala.collection.JavaConverters._
 import scala.concurrent.Future
 
 import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesExecutorSpecificConf, SparkPod}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.rpc._
@@ -47,8 +49,6 @@ class KubernetesClusterSchedulerBackendSuite extends 
SparkFunSuite with BeforeAn
   private val SPARK_DRIVER_HOST = "localhost"
   private val SPARK_DRIVER_PORT = 7077
   private val POD_ALLOCATION_INTERVAL = "1m"
-  private val DRIVER_URL = RpcEndpointAddress(
-    SPARK_DRIVER_HOST, SPARK_DRIVER_PORT, 
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
   private val FIRST_EXECUTOR_POD = new PodBuilder()
     .withNewMetadata()
       .withName("pod1")
@@ -94,7 +94,7 @@ class KubernetesClusterSchedulerBackendSuite extends 
SparkFunSuite with BeforeAn
   private var requestExecutorsService: ExecutorService = _
 
   @Mock
-  private var executorPodFactory: ExecutorPodFactory = _
+  private var executorBuilder: KubernetesExecutorBuilder = _
 
   @Mock
   private var kubernetesClient: KubernetesClient = _
@@ -399,7 +399,7 @@ class KubernetesClusterSchedulerBackendSuite extends 
SparkFunSuite with BeforeAn
     new KubernetesClusterSchedulerBackend(
       taskSchedulerImpl,
       rpcEnv,
-      executorPodFactory,
+      executorBuilder,
       kubernetesClient,
       allocatorExecutor,
       requestExecutorsService) {
@@ -428,13 +428,22 @@ class KubernetesClusterSchedulerBackendSuite extends 
SparkFunSuite with BeforeAn
         .addToLabels(SPARK_EXECUTOR_ID_LABEL, executorId.toString)
         .endMetadata()
       .build()
-    when(executorPodFactory.createExecutorPod(
-      executorId.toString,
-      APP_ID,
-      DRIVER_URL,
-      sparkConf.getExecutorEnv,
-      driverPod,
-      Map.empty)).thenReturn(resolvedPod)
-    resolvedPod
+    val resolvedContainer = new ContainerBuilder().build()
+    when(executorBuilder.buildFromFeatures(Matchers.argThat(
+      new BaseMatcher[KubernetesConf[KubernetesExecutorSpecificConf]] {
+        override def matches(argument: scala.Any)
+          : Boolean = {
+          
argument.isInstanceOf[KubernetesConf[KubernetesExecutorSpecificConf]] &&
+            
argument.asInstanceOf[KubernetesConf[KubernetesExecutorSpecificConf]]
+              .roleSpecificConf.executorId == executorId.toString
+        }
+
+        override def describeTo(description: Description): Unit = {}
+      }))).thenReturn(SparkPod(resolvedPod, resolvedContainer))
+    new PodBuilder(resolvedPod)
+      .editSpec()
+        .addToContainers(resolvedContainer)
+        .endSpec()
+      .build()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
new file mode 100644
index 0000000..f527062
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import io.fabric8.kubernetes.api.model.PodBuilder
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesExecutorSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, 
KubernetesFeaturesTestUtils, MountSecretsFeatureStep}
+
+class KubernetesExecutorBuilderSuite extends SparkFunSuite {
+  private val BASIC_STEP_TYPE = "basic"
+  private val SECRETS_STEP_TYPE = "mount-secrets"
+
+  private val basicFeatureStep = 
KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
+    BASIC_STEP_TYPE, classOf[BasicExecutorFeatureStep])
+  private val mountSecretsStep = 
KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
+    SECRETS_STEP_TYPE, classOf[MountSecretsFeatureStep])
+
+  private val builderUnderTest = new KubernetesExecutorBuilder(
+    _ => basicFeatureStep,
+    _ => mountSecretsStep)
+
+  test("Basic steps are consistently applied.") {
+    val conf = KubernetesConf(
+      new SparkConf(false),
+      KubernetesExecutorSpecificConf(
+        "executor-id", new PodBuilder().build()),
+      "prefix",
+      "appId",
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Map.empty)
+    validateStepTypesApplied(builderUnderTest.buildFromFeatures(conf), 
BASIC_STEP_TYPE)
+  }
+
+  test("Apply secrets step if secrets are present.") {
+    val conf = KubernetesConf(
+      new SparkConf(false),
+      KubernetesExecutorSpecificConf(
+        "executor-id", new PodBuilder().build()),
+      "prefix",
+      "appId",
+      Map.empty,
+      Map.empty,
+      Map("secret" -> "secretMountPath"),
+      Map.empty)
+    validateStepTypesApplied(
+      builderUnderTest.buildFromFeatures(conf),
+      BASIC_STEP_TYPE,
+      SECRETS_STEP_TYPE)
+  }
+
+  private def validateStepTypesApplied(resolvedPod: SparkPod, stepTypes: 
String*): Unit = {
+    assert(resolvedPod.pod.getMetadata.getLabels.size === stepTypes.size)
+    stepTypes.foreach { stepType =>
+      assert(resolvedPod.pod.getMetadata.getLabels.get(stepType) === stepType)
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to