holdenk commented on a change in pull request #26440: [SPARK-20628][CORE][K8S] 
Start to improve Spark decommissioning & preemption support
URL: https://github.com/apache/spark/pull/26440#discussion_r379632318
 
 

 ##########
 File path: 
resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
 ##########
 @@ -264,60 +267,120 @@ class KubernetesSuite extends SparkFunSuite
       appLocator: String,
       isJVM: Boolean,
       pyFiles: Option[String] = None,
-      executorPatience: Option[(Option[Interval], Option[Timeout])] = None): 
Unit = {
+      executorPatience: Option[(Option[Interval], Option[Timeout])] = None,
+      decommissioningTest: Boolean = false): Unit = {
+
+  // scalastyle:on argcount
     val appArguments = SparkAppArguments(
       mainAppResource = appResource,
       mainClass = mainClass,
       appArgs = appArgs)
-    SparkAppLauncher.launch(
-      appArguments,
-      sparkAppConf,
-      TIMEOUT.value.toSeconds.toInt,
-      sparkHomeDir,
-      isJVM,
-      pyFiles)
 
-    val driverPod = kubernetesTestComponents.kubernetesClient
-      .pods()
-      .withLabel("spark-app-locator", appLocator)
-      .withLabel("spark-role", "driver")
-      .list()
-      .getItems
-      .get(0)
-    driverPodChecker(driverPod)
     val execPods = scala.collection.mutable.Map[String, Pod]()
+    val (patienceInterval, patienceTimeout) = {
+      executorPatience match {
+        case Some(patience) => (patience._1.getOrElse(INTERVAL), 
patience._2.getOrElse(TIMEOUT))
+        case _ => (INTERVAL, TIMEOUT)
+      }
+    }
+    def checkPodReady(namespace: String, name: String) = {
+      val execPod = kubernetesTestComponents.kubernetesClient
+        .pods()
+        .inNamespace(namespace)
+        .withName(name)
+        .get()
+      val resourceStatus = execPod.getStatus
+      val conditions = resourceStatus.getConditions().asScala
+      val conditionTypes = conditions.map(_.getType())
+      val readyConditions = conditions.filter{cond => cond.getType() == 
"Ready"}
+      val result = readyConditions
+        .map(cond => cond.getStatus() == "True")
+        .headOption.getOrElse(false)
+      result
+    }
     val execWatcher = kubernetesTestComponents.kubernetesClient
       .pods()
       .withLabel("spark-app-locator", appLocator)
       .withLabel("spark-role", "executor")
       .watch(new Watcher[Pod] {
-        logInfo("Beginning watch of executors")
+        logDebug("Beginning watch of executors")
         override def onClose(cause: KubernetesClientException): Unit =
           logInfo("Ending watch of executors")
         override def eventReceived(action: Watcher.Action, resource: Pod): 
Unit = {
           val name = resource.getMetadata.getName
+          val namespace = resource.getMetadata().getNamespace()
           action match {
-            case Action.ADDED | Action.MODIFIED =>
+            case Action.MODIFIED =>
+              execPods(name) = resource
+            case Action.ADDED =>
+              logDebug(s"Add event received for $name.")
               execPods(name) = resource
+              // If testing decommissioning start a thread to simulate
+              // decommissioning.
+              if (decommissioningTest && execPods.size == 1) {
+                // Wait for all the containers in the pod to be running
+                logDebug("Waiting for first pod to become OK prior to 
deletion")
+                Eventually.eventually(patienceTimeout, patienceInterval) {
+                  val result = checkPodReady(namespace, name)
+                  result shouldBe (true)
+                }
+                // Sleep a small interval to allow execution of job
+                logDebug("Sleeping before killing pod.")
+                Thread.sleep(2000)
+                // Delete the pod to simulate cluster scale down/migration.
+                val pod = 
kubernetesTestComponents.kubernetesClient.pods().withName(name)
+                pod.delete()
+                logDebug(s"Triggered pod decom/delete: $name deleted")
+              }
             case Action.DELETED | Action.ERROR =>
               execPods.remove(name)
           }
         }
       })
 
-    val (patienceInterval, patienceTimeout) = {
-      executorPatience match {
-        case Some(patience) => (patience._1.getOrElse(INTERVAL), 
patience._2.getOrElse(TIMEOUT))
-        case _ => (INTERVAL, TIMEOUT)
-      }
-    }
+    logDebug("Starting Spark K8s job")
+    SparkAppLauncher.launch(
+      appArguments,
+      sparkAppConf,
+      TIMEOUT.value.toSeconds.toInt,
+      sparkHomeDir,
+      isJVM,
+      pyFiles)
 
+    val driverPod = kubernetesTestComponents.kubernetesClient
+      .pods()
+      .withLabel("spark-app-locator", appLocator)
+      .withLabel("spark-role", "driver")
+      .list()
+      .getItems
+      .get(0)
+
+    driverPodChecker(driverPod)
+    // If we're testing decommissioning we delete all the executors, but we 
should have
+    // an executor at some point.
     Eventually.eventually(patienceTimeout, patienceInterval) {
       execPods.values.nonEmpty should be (true)
     }
+    // If decommissioning we need to wait and check the executors were removed
+    if (decommissioningTest) {
+      // Sleep a small interval to ensure everything is registered.
+      Thread.sleep(100)
+      // Wait for the executors to become ready
+      Eventually.eventually(patienceTimeout, patienceInterval) {
+        val anyReadyPods = ! execPods.map{
+          case (name, resource) =>
+            (name, resource.getMetadata().getNamespace())
+        }.filter{
+          case (name, namespace) => checkPodReady(namespace, name)
 
 Review comment:
   Filed a follow up for this improvement

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to