holdenk commented on a change in pull request #26440: [SPARK-20628][CORE][K8S]
Start to improve Spark decommissioning & preemption support
URL: https://github.com/apache/spark/pull/26440#discussion_r373192554
##########
File path:
resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
##########
@@ -264,60 +267,120 @@ class KubernetesSuite extends SparkFunSuite
appLocator: String,
isJVM: Boolean,
pyFiles: Option[String] = None,
- executorPatience: Option[(Option[Interval], Option[Timeout])] = None):
Unit = {
+ executorPatience: Option[(Option[Interval], Option[Timeout])] = None,
+ decommissioningTest: Boolean = false): Unit = {
+
+ // scalastyle:on argcount
val appArguments = SparkAppArguments(
mainAppResource = appResource,
mainClass = mainClass,
appArgs = appArgs)
- SparkAppLauncher.launch(
- appArguments,
- sparkAppConf,
- TIMEOUT.value.toSeconds.toInt,
- sparkHomeDir,
- isJVM,
- pyFiles)
- val driverPod = kubernetesTestComponents.kubernetesClient
- .pods()
- .withLabel("spark-app-locator", appLocator)
- .withLabel("spark-role", "driver")
- .list()
- .getItems
- .get(0)
- driverPodChecker(driverPod)
val execPods = scala.collection.mutable.Map[String, Pod]()
+ val (patienceInterval, patienceTimeout) = {
+ executorPatience match {
+ case Some(patience) => (patience._1.getOrElse(INTERVAL),
patience._2.getOrElse(TIMEOUT))
+ case _ => (INTERVAL, TIMEOUT)
+ }
+ }
+ def checkPodReady(namespace: String, name: String) = {
+ val execPod = kubernetesTestComponents.kubernetesClient
+ .pods()
+ .inNamespace(namespace)
+ .withName(name)
+ .get()
+ val resourceStatus = execPod.getStatus
+ val conditions = resourceStatus.getConditions().asScala
+ val conditionTypes = conditions.map(_.getType())
+ val readyConditions = conditions.filter{cond => cond.getType() ==
"Ready"}
+ val result = readyConditions
+ .map(cond => cond.getStatus() == "True")
+ .headOption.getOrElse(false)
+ result
+ }
val execWatcher = kubernetesTestComponents.kubernetesClient
.pods()
.withLabel("spark-app-locator", appLocator)
.withLabel("spark-role", "executor")
.watch(new Watcher[Pod] {
- logInfo("Beginning watch of executors")
+ logDebug("Beginning watch of executors")
override def onClose(cause: KubernetesClientException): Unit =
logInfo("Ending watch of executors")
override def eventReceived(action: Watcher.Action, resource: Pod):
Unit = {
val name = resource.getMetadata.getName
+ val namespace = resource.getMetadata().getNamespace()
action match {
- case Action.ADDED | Action.MODIFIED =>
+ case Action.MODIFIED =>
+ execPods(name) = resource
+ case Action.ADDED =>
+ logDebug(s"Add event received for $name.")
execPods(name) = resource
+ // If testing decommissioning start a thread to simulate
+ // decommissioning.
+ if (decommissioningTest && execPods.size == 1) {
+ // Wait for all the containers in the pod to be running
+ logDebug("Waiting for first pod to become OK prior to
deletion")
+ Eventually.eventually(patienceTimeout, patienceInterval) {
+ val result = checkPodReady(namespace, name)
+ result shouldBe (true)
+ }
+ // Sleep a small interval to allow execution of job
+ logDebug("Sleeping before killing pod.")
+ Thread.sleep(2000)
+ // Delete the pod to simulate cluster scale down/migration.
+ val pod =
kubernetesTestComponents.kubernetesClient.pods().withName(name)
+ pod.delete()
+ logDebug(s"Triggered pod decom/delete: $name deleted")
+ }
case Action.DELETED | Action.ERROR =>
execPods.remove(name)
}
}
})
- val (patienceInterval, patienceTimeout) = {
- executorPatience match {
- case Some(patience) => (patience._1.getOrElse(INTERVAL),
patience._2.getOrElse(TIMEOUT))
- case _ => (INTERVAL, TIMEOUT)
- }
- }
+ logDebug("Starting Spark K8s job")
+ SparkAppLauncher.launch(
+ appArguments,
+ sparkAppConf,
+ TIMEOUT.value.toSeconds.toInt,
+ sparkHomeDir,
+ isJVM,
+ pyFiles)
+ val driverPod = kubernetesTestComponents.kubernetesClient
+ .pods()
+ .withLabel("spark-app-locator", appLocator)
+ .withLabel("spark-role", "driver")
+ .list()
+ .getItems
+ .get(0)
+
+ driverPodChecker(driverPod)
+ // If we're testing decommissioning we delete all the executors, but we
should have
+ // an executor at some point.
Eventually.eventually(patienceTimeout, patienceInterval) {
execPods.values.nonEmpty should be (true)
}
+ // If decommissioning we need to wait and check the executors were removed
+ if (decommissioningTest) {
+ // Sleep a small interval to ensure everything is registered.
+ Thread.sleep(100)
+ // Wait for the executors to become ready
+ Eventually.eventually(patienceTimeout, patienceInterval) {
+ val anyReadyPods = ! execPods.map{
+ case (name, resource) =>
+ (name, resource.getMetadata().getNamespace())
+ }.filter{
+ case (name, namespace) => checkPodReady(namespace, name)
Review comment:
Isn't shelling out here not great?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]