dongjoon-hyun commented on a change in pull request #35553:
URL: https://github.com/apache/spark/pull/35553#discussion_r816392731
##########
File path:
resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala
##########
@@ -37,12 +54,82 @@ private[spark] trait VolcanoTestsSuite { k8sSuite:
KubernetesSuite =>
assert(annotations.get("scheduling.k8s.io/group-name") ===
s"$appId-podgroup")
}
- protected def checkPodGroup(pod: Pod): Unit = {
+ protected def checkPodGroup(
+ pod: Pod,
+ queue: Option[String] = None): Unit = {
val appId = pod.getMetadata.getLabels.get("spark-app-selector")
val podGroupName = s"$appId-podgroup"
- val volcanoClient =
kubernetesTestComponents.kubernetesClient.adapt(classOf[VolcanoClient])
val podGroup = volcanoClient.podGroups().withName(podGroupName).get()
assert(podGroup.getMetadata.getOwnerReferences.get(0).getName ===
pod.getMetadata.getName)
+ val spec = podGroup.getSpec
+ if (queue.isDefined) assert(spec.getQueue === queue.get)
+ }
+
+ private def createOrReplaceYAMLResource(yamlPath: String): Unit = {
+ k8sClient.load(new FileInputStream(yamlPath)).createOrReplace()
+ }
+
+ private def deleteYAMLResource(yamlPath: String): Unit = {
+ k8sClient.load(new FileInputStream(yamlPath)).delete()
+ }
+
+ private def getPods(
+ role: String,
+ groupLocator: String,
+ statusPhase: String): mutable.Buffer[Pod] = {
+ k8sClient
+ .pods()
+ .withLabel("spark-group-locator", groupLocator)
+ .withLabel("spark-role", role)
+ .withField("status.phase", statusPhase)
+ .list()
+ .getItems.asScala
+ }
+
+ def runJobAndVerify(
+ batchSuffix: String,
+ groupLoc: Option[String] = None,
+ queue: Option[String] = None): Unit = {
+ val appLoc = s"${appLocator}${batchSuffix}"
+ val podName = s"${driverPodName}-${batchSuffix}"
+ // create new configuration for every job
+ val conf = createVolcanoSparkConf(podName, appLoc, groupLoc, queue)
+ runSparkPiAndVerifyCompletion(
+ driverPodChecker = (driverPod: Pod) => {
+ checkScheduler(driverPod)
+ checkAnnotaion(driverPod)
+ checkPodGroup(driverPod, queue)
+ },
+ executorPodChecker = (executorPod: Pod) => {
+ checkScheduler(executorPod)
+ checkAnnotaion(executorPod)
+ },
+ customSparkConf = Option(conf),
+ customAppLocator = Option(appLoc)
+ )
+ }
+
+ private def createVolcanoSparkConf(
+ driverPodName: String = driverPodName,
+ appLoc: String = appLocator,
+ groupLoc: Option[String] = None,
+ queue: Option[String] = None): SparkAppConf = {
+ val conf = kubernetesTestComponents.newSparkAppConf()
+ .set(CONTAINER_IMAGE.key, image)
+ .set(KUBERNETES_DRIVER_POD_NAME.key, driverPodName)
+ .set(s"${KUBERNETES_DRIVER_LABEL_PREFIX}spark-app-locator", appLoc)
+ .set(s"${KUBERNETES_EXECUTOR_LABEL_PREFIX}spark-app-locator", appLoc)
+ .set(NETWORK_AUTH_ENABLED.key, "true")
+ // below is volcano specific configuration
+ .set(KUBERNETES_SCHEDULER_NAME.key, "volcano")
+ .set(KUBERNETES_DRIVER_POD_FEATURE_STEPS.key, VOLCANO_FEATURE_STEP)
+ .set(KUBERNETES_EXECUTOR_POD_FEATURE_STEPS.key, VOLCANO_FEATURE_STEP)
+ if (queue.isDefined) conf.set(KUBERNETES_JOB_QUEUE.key, queue.get)
Review comment:
We prefer the following style, @Yikun .
```scala
- if (queue.isDefined) conf.set(KUBERNETES_JOB_QUEUE.key, queue.get)
+ queue.foreach(conf.set(KUBERNETES_JOB_QUEUE.key, _))
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]