Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/19468#discussion_r145260359
--- Diff:
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
---
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{ExecutorService, ScheduledExecutorService,
TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.concurrent.Future
+
+import io.fabric8.kubernetes.api.model.{DoneablePod, Pod, PodBuilder,
PodList}
+import io.fabric8.kubernetes.client.{KubernetesClient, Watch, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+import io.fabric8.kubernetes.client.dsl.{FilterWatchListDeletable,
MixedOperation, NonNamespaceOperation, PodResource}
+import org.mockito.{AdditionalAnswers, ArgumentCaptor, Mock,
MockitoAnnotations}
+import org.mockito.Matchers.{any, eq => mockitoEq}
+import org.mockito.Mockito.{mock => _, _}
+import org.scalatest.BeforeAndAfter
+import org.scalatest.mock.MockitoSugar._
+
+import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.deploy.k8s.config._
+import org.apache.spark.deploy.k8s.constants._
+import org.apache.spark.rpc._
+import org.apache.spark.scheduler.{ExecutorExited, LiveListenerBus,
SlaveLost, TaskSchedulerImpl}
+import
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor,
RemoveExecutor}
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+
+private[spark] class KubernetesClusterSchedulerBackendSuite
+ extends SparkFunSuite with BeforeAndAfter {
+
+ private val APP_ID = "test-spark-app"
+ private val DRIVER_POD_NAME = "spark-driver-pod"
+ private val NAMESPACE = "test-namespace"
+ private val SPARK_DRIVER_HOST = "localhost"
+ private val SPARK_DRIVER_PORT = 7077
+ private val POD_ALLOCATION_INTERVAL = 60L
+ private val DRIVER_URL = RpcEndpointAddress(
+ SPARK_DRIVER_HOST, SPARK_DRIVER_PORT,
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+ private val FIRST_EXECUTOR_POD = new PodBuilder()
+ .withNewMetadata()
+ .withName("pod1")
+ .endMetadata()
+ .withNewSpec()
+ .withNodeName("node1")
+ .endSpec()
+ .withNewStatus()
+ .withHostIP("192.168.99.100")
+ .endStatus()
+ .build()
+ private val SECOND_EXECUTOR_POD = new PodBuilder()
+ .withNewMetadata()
+ .withName("pod2")
+ .endMetadata()
+ .withNewSpec()
+ .withNodeName("node2")
+ .endSpec()
+ .withNewStatus()
+ .withHostIP("192.168.99.101")
+ .endStatus()
+ .build()
+
+ private type PODS = MixedOperation[Pod, PodList, DoneablePod,
PodResource[Pod, DoneablePod]]
+ private type LABELLED_PODS = FilterWatchListDeletable[
+ Pod, PodList, java.lang.Boolean, Watch, Watcher[Pod]]
+ private type IN_NAMESPACE_PODS = NonNamespaceOperation[
+ Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]
+
+ @Mock
+ private var sparkContext: SparkContext = _
+
+ @Mock
+ private var listenerBus: LiveListenerBus = _
+
+ @Mock
+ private var taskSchedulerImpl: TaskSchedulerImpl = _
+
+ @Mock
+ private var allocatorExecutor: ScheduledExecutorService = _
+
+ @Mock
+ private var requestExecutorsService: ExecutorService = _
+
+ @Mock
+ private var executorPodFactory: ExecutorPodFactory = _
+
+ @Mock
+ private var kubernetesClient: KubernetesClient = _
+
+ @Mock
+ private var podOperations: PODS = _
+
+ @Mock
+ private var podsWithLabelOperations: LABELLED_PODS = _
+
+ @Mock
+ private var podsInNamespace: IN_NAMESPACE_PODS = _
+
+ @Mock
+ private var podsWithDriverName: PodResource[Pod, DoneablePod] = _
+
+ @Mock
+ private var rpcEnv: RpcEnv = _
+
+ @Mock
+ private var driverEndpointRef: RpcEndpointRef = _
+
+ @Mock
+ private var executorPodsWatch: Watch = _
+
+ private var sparkConf: SparkConf = _
+ private var executorPodsWatcherArgument: ArgumentCaptor[Watcher[Pod]] = _
+ private var allocatorRunnable: ArgumentCaptor[Runnable] = _
+ private var requestExecutorRunnable: ArgumentCaptor[Runnable] = _
+ private var driverEndpoint: ArgumentCaptor[RpcEndpoint] = _
+
+ private val driverPod = new PodBuilder()
+ .withNewMetadata()
+ .withName(DRIVER_POD_NAME)
+ .addToLabels(SPARK_APP_ID_LABEL, APP_ID)
+ .addToLabels(SPARK_ROLE_LABEL, SPARK_POD_DRIVER_ROLE)
+ .endMetadata()
+ .build()
+
+ before {
+ MockitoAnnotations.initMocks(this)
+ sparkConf = new SparkConf()
+ .set("spark.app.id", APP_ID)
+ .set(KUBERNETES_DRIVER_POD_NAME, DRIVER_POD_NAME)
+ .set(KUBERNETES_NAMESPACE, NAMESPACE)
+ .set("spark.driver.host", SPARK_DRIVER_HOST)
+ .set("spark.driver.port", SPARK_DRIVER_PORT.toString)
+ .set(KUBERNETES_ALLOCATION_BATCH_DELAY, POD_ALLOCATION_INTERVAL)
+ executorPodsWatcherArgument =
ArgumentCaptor.forClass(classOf[Watcher[Pod]])
+ allocatorRunnable = ArgumentCaptor.forClass(classOf[Runnable])
+ requestExecutorRunnable = ArgumentCaptor.forClass(classOf[Runnable])
+ driverEndpoint = ArgumentCaptor.forClass(classOf[RpcEndpoint])
+ when(sparkContext.conf).thenReturn(sparkConf)
+ when(sparkContext.listenerBus).thenReturn(listenerBus)
+ when(taskSchedulerImpl.sc).thenReturn(sparkContext)
+ when(kubernetesClient.pods()).thenReturn(podOperations)
+ when(podOperations.withLabel(SPARK_APP_ID_LABEL,
APP_ID)).thenReturn(podsWithLabelOperations)
+
when(podsWithLabelOperations.watch(executorPodsWatcherArgument.capture()))
+ .thenReturn(executorPodsWatch)
+ when(podOperations.inNamespace(NAMESPACE)).thenReturn(podsInNamespace)
+
when(podsInNamespace.withName(DRIVER_POD_NAME)).thenReturn(podsWithDriverName)
+ when(podsWithDriverName.get()).thenReturn(driverPod)
+ when(allocatorExecutor.scheduleWithFixedDelay(
+ allocatorRunnable.capture(),
+ mockitoEq(0L),
+ mockitoEq(POD_ALLOCATION_INTERVAL),
+ mockitoEq(TimeUnit.SECONDS))).thenReturn(null)
+ // Creating Futures in Scala backed by a Java executor service
resolves to running
+ // ExecutorService#execute (as opposed to submit)
+
doNothing().when(requestExecutorsService).execute(requestExecutorRunnable.capture())
+ when(rpcEnv.setupEndpoint(
+ mockitoEq(CoarseGrainedSchedulerBackend.ENDPOINT_NAME),
driverEndpoint.capture()))
+ .thenReturn(driverEndpointRef)
+ when(driverEndpointRef.ask[Boolean]
+ (any(classOf[Any]))
+ (any())).thenReturn(mock[Future[Boolean]])
+ }
+
+ test("Basic lifecycle expectations when starting and stopping the
scheduler.") {
+ val scheduler = newSchedulerBackend()
+ scheduler.start()
+ assert(executorPodsWatcherArgument.getValue != null)
+ assert(allocatorRunnable.getValue != null)
+ scheduler.stop()
+ verify(executorPodsWatch).close()
+ }
+
+ test("Static allocation should request executors upon first allocator
run.") {
+ sparkConf
+ .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 2)
+ .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 2)
+ val scheduler = newSchedulerBackend()
+ scheduler.start()
+ requestExecutorRunnable.getValue.run()
+ expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
+ expectPodCreationWithId(2, SECOND_EXECUTOR_POD)
+
when(podOperations.create(any(classOf[Pod]))).thenAnswer(AdditionalAnswers.returnsFirstArg())
+ allocatorRunnable.getValue.run()
+ verify(podOperations).create(FIRST_EXECUTOR_POD)
+ verify(podOperations).create(SECOND_EXECUTOR_POD)
+ }
+
+ test("Killing executors deletes the executor pods") {
+ sparkConf
+ .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 2)
+ .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 2)
+ val scheduler = newSchedulerBackend()
+ scheduler.start()
+ requestExecutorRunnable.getValue.run()
+ expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
+ expectPodCreationWithId(2, SECOND_EXECUTOR_POD)
+ when(podOperations.create(any(classOf[Pod])))
+ .thenAnswer(AdditionalAnswers.returnsFirstArg())
+ allocatorRunnable.getValue.run()
+ scheduler.doKillExecutors(Seq("2"))
+ requestExecutorRunnable.getAllValues.asScala.last.run()
+ verify(podOperations).delete(SECOND_EXECUTOR_POD)
+ verify(podOperations, never()).delete(FIRST_EXECUTOR_POD)
+ }
+
+ test("Executors should be requested in batches.") {
+ sparkConf
+ .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1)
+ .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 2)
+ val scheduler = newSchedulerBackend()
+ scheduler.start()
+ requestExecutorRunnable.getValue.run()
+ when(podOperations.create(any(classOf[Pod])))
+ .thenAnswer(AdditionalAnswers.returnsFirstArg())
+ expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
+ expectPodCreationWithId(2, SECOND_EXECUTOR_POD)
+ allocatorRunnable.getValue.run()
+ verify(podOperations).create(FIRST_EXECUTOR_POD)
+ verify(podOperations, never()).create(SECOND_EXECUTOR_POD)
+ val registerFirstExecutorMessage = RegisterExecutor(
+ "1", mock[RpcEndpointRef], "localhost", 1, Map.empty[String,
String])
+ when(taskSchedulerImpl.resourceOffers(any())).thenReturn(Seq.empty)
+ driverEndpoint.getValue.receiveAndReply(mock[RpcCallContext])
+ .apply(registerFirstExecutorMessage)
+ allocatorRunnable.getValue.run()
+ verify(podOperations).create(SECOND_EXECUTOR_POD)
+ }
+
+ test("Deleting executors and then running an allocator pass after
finding the loss reason" +
--- End diff --
shorter test name?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]