ifilonenko commented on a change in pull request #26440:
[SPARK-20628][CORE][K8S] Start to improve Spark decommissioning & preemption
support
URL: https://github.com/apache/spark/pull/26440#discussion_r373153511
##########
File path:
resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
##########
@@ -264,60 +267,120 @@ class KubernetesSuite extends SparkFunSuite
appLocator: String,
isJVM: Boolean,
pyFiles: Option[String] = None,
- executorPatience: Option[(Option[Interval], Option[Timeout])] = None):
Unit = {
+ executorPatience: Option[(Option[Interval], Option[Timeout])] = None,
+ decommissioningTest: Boolean = false): Unit = {
+
+ // scalastyle:on argcount
val appArguments = SparkAppArguments(
mainAppResource = appResource,
mainClass = mainClass,
appArgs = appArgs)
- SparkAppLauncher.launch(
- appArguments,
- sparkAppConf,
- TIMEOUT.value.toSeconds.toInt,
- sparkHomeDir,
- isJVM,
- pyFiles)
- val driverPod = kubernetesTestComponents.kubernetesClient
- .pods()
- .withLabel("spark-app-locator", appLocator)
- .withLabel("spark-role", "driver")
- .list()
- .getItems
- .get(0)
- driverPodChecker(driverPod)
val execPods = scala.collection.mutable.Map[String, Pod]()
+ val (patienceInterval, patienceTimeout) = {
+ executorPatience match {
+ case Some(patience) => (patience._1.getOrElse(INTERVAL),
patience._2.getOrElse(TIMEOUT))
+ case _ => (INTERVAL, TIMEOUT)
+ }
+ }
+ def checkPodReady(namespace: String, name: String) = {
+ val execPod = kubernetesTestComponents.kubernetesClient
+ .pods()
+ .inNamespace(namespace)
+ .withName(name)
+ .get()
+ val resourceStatus = execPod.getStatus
+ val conditions = resourceStatus.getConditions().asScala
+ val conditionTypes = conditions.map(_.getType())
+ val readyConditions = conditions.filter{cond => cond.getType() ==
"Ready"}
+ val result = readyConditions
+ .map(cond => cond.getStatus() == "True")
+ .headOption.getOrElse(false)
+ result
+ }
val execWatcher = kubernetesTestComponents.kubernetesClient
.pods()
.withLabel("spark-app-locator", appLocator)
.withLabel("spark-role", "executor")
.watch(new Watcher[Pod] {
- logInfo("Beginning watch of executors")
+ logDebug("Beginning watch of executors")
override def onClose(cause: KubernetesClientException): Unit =
logInfo("Ending watch of executors")
override def eventReceived(action: Watcher.Action, resource: Pod):
Unit = {
val name = resource.getMetadata.getName
+ val namespace = resource.getMetadata().getNamespace()
action match {
- case Action.ADDED | Action.MODIFIED =>
+ case Action.MODIFIED =>
+ execPods(name) = resource
+ case Action.ADDED =>
+ logDebug(s"Add event received for $name.")
execPods(name) = resource
+ // If testing decommissioning start a thread to simulate
+ // decommissioning.
+ if (decommissioningTest && execPods.size == 1) {
+ // Wait for all the containers in the pod to be running
+ logDebug("Waiting for first pod to become OK prior to
deletion")
+ Eventually.eventually(patienceTimeout, patienceInterval) {
+ val result = checkPodReady(namespace, name)
+ result shouldBe (true)
+ }
+ // Sleep a small interval to allow execution of job
+ logDebug("Sleeping before killing pod.")
Review comment:
Maybe trigger delete upon seeing the presence of logs that show that the
first `.collect()` happened
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]