Yikun commented on a change in pull request #35639:
URL: https://github.com/apache/spark/pull/35639#discussion_r820180270



##########
File path: 
resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala
##########
@@ -190,6 +238,76 @@ private[spark] trait VolcanoTestsSuite { k8sSuite: 
KubernetesSuite =>
     }
     deleteYAMLResource(VOLCANO_ENABLE_Q0_AND_Q1_YAML)
   }
+
+  test("SPARK-38189: Run SparkPi Jobs with priorityClassName", k8sTestTag, 
volcanoTag) {
+    // Prepare the priority resource
+    createOrReplaceYAMLResource(VOLCANO_PRIORITY_YAML)
+    val priorities = Seq("low", "medium", "high")
+    val groupName = s"$GROUP_PREFIX-priority"
+    priorities.foreach { p =>
+      Future {
+        val templatePath = new File(
+          
getClass.getResource(s"/volcano/$p-priority-driver-template.yml").getFile
+        ).getAbsolutePath
+        runJobAndVerify(
+          p, groupLoc = Option(groupName),
+          driverTemplate = Option(templatePath)
+        )
+      }
+    }
+    // Make sure all jobs are Succeeded
+    Eventually.eventually(TIMEOUT, INTERVAL) {
+        val pods = getPods(role = "driver", groupName, statusPhase = 
"Succeeded")
+        assert(pods.size === priorities.size)
+    }
+    // TODO: rebase this
+    deleteYAMLResource(VOLCANO_PRIORITY_YAML)
+  }
+
+  test("SPARK-38189: Run driver job to validate priority order", k8sTestTag, 
volcanoTag) {
+    // Prepare the priority resource
+    createOrReplaceYAMLResource(ENABLE_QUEUE)
+    createOrReplaceYAMLResource(VOLCANO_PRIORITY_YAML)
+    // Submit 3 jobs with different priority
+    val priorities = Seq("low", "medium", "high")
+    priorities.foreach { p =>
+      Future {
+        val templatePath = new File(
+          
getClass.getResource(s"/volcano/$p-priority-driver-template.yml").getFile
+        ).getAbsolutePath
+        runJobAndVerify(
+          p, groupLoc = Option(s"$GROUP_PREFIX-$p"),
+          queue = Option("queue"),
+          driverTemplate = Option(templatePath),
+          isDriverJob = true
+        )
+      }
+    }
+    // Make sure 3 jobs are pending
+    Eventually.eventually(TIMEOUT, INTERVAL) {
+      priorities.foreach { p =>
+        val pods = getPods(role = "driver", s"$GROUP_PREFIX-$p", statusPhase = 
"Pending")
+        assert(pods.size === 1)
+      }
+    }
+    // Enable queue to let job enqueue
+    createOrReplaceYAMLResource(ENABLE_QUEUE)
+    Eventually.eventually(TIMEOUT, INTERVAL) {
+      var m = Map.empty[String, Instant]
+      priorities.foreach { p =>
+        val pods = getPods(role = "driver", s"$GROUP_PREFIX-$p", statusPhase = 
"Succeeded")
+        val conditions = pods.head.getStatus.getConditions.asScala
+        val scheduledTime
+          = conditions.filter(_.getType === 
"PodScheduled").head.getLastTransitionTime
+        m += (p -> Instant.parse(scheduledTime))
+      }
+      assert(m("high").isBefore(m("medium")))
+      assert(m("medium").isBefore(m("low")))
+    }
+    // TODO: rebase this

Review comment:
       after this merged: https://github.com/apache/spark/pull/35733
   
   we can cleanup this yaml.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to