pgillet commented on a change in pull request #30352:
URL: https://github.com/apache/spark/pull/30352#discussion_r523831478
##########
File path:
resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
##########
@@ -567,40 +567,160 @@ class MesosClusterSchedulerSuite extends SparkFunSuite
with LocalSparkContext wi
}
test("assembles a valid driver command, escaping all confs and args") {
- setScheduler()
+ setScheduler()
+
+ val mem = 1000
+ val cpu = 1
+ val driverDesc = new MesosDriverDescription(
+ "d1",
+ "jar",
+ mem,
+ cpu,
+ true,
+ new Command(
+ "Main",
+ Seq("--a=$2", "--b", "x y z"),
+ Map(),
+ Seq(),
+ Seq(),
+ Seq()),
+ Map("spark.app.name" -> "app name",
+ config.EXECUTOR_URI.key -> "s3a://bucket/spark-version.tgz",
+ "another.conf" -> "\\value"),
+ "s1",
+ new Date())
+
+ val expectedCmd = "cd spark-version*; " +
+ "bin/spark-submit --name \"app name\" --master
mesos://mesos://localhost:5050 " +
+ "--driver-cores 1.0 --driver-memory 1000M --class Main " +
+ "--conf \"another.conf=\\\\value\" " +
+ "--conf \"spark.app.name=app name\" " +
+ "--conf spark.executor.uri=s3a://bucket/spark-version.tgz " +
+ "../jar " +
+ "\"--a=\\$2\" " +
+ "--b \"x y z\""
+
+ assert(scheduler.getDriverCommandValue(driverDesc) == expectedCmd)
+ }
+
+ test("Get driver priority") {
+ val conf = new SparkConf()
+ conf.set("spark.mesos.dispatcher.queue.ROUTINE", "1.0")
+ conf.set("spark.mesos.dispatcher.queue.URGENT", "2.0")
+ conf.set("spark.mesos.dispatcher.queue.EXCEPTIONAL", "3.0")
+ setScheduler(conf.getAll.toMap)
val mem = 1000
val cpu = 1
- val driverDesc = new MesosDriverDescription(
- "d1",
- "jar",
- mem,
- cpu,
- true,
- new Command(
- "Main",
- Seq("--a=$2", "--b", "x y z"),
- Map(),
- Seq(),
- Seq(),
- Seq()),
- Map("spark.app.name" -> "app name",
- config.EXECUTOR_URI.key -> "s3a://bucket/spark-version.tgz",
- "another.conf" -> "\\value"),
- "s1",
+
+ // Test queue not declared in scheduler
+ var desc = new MesosDriverDescription("d1", "jar", mem, cpu, true,
+ command,
+ Map("spark.mesos.dispatcher.queue" -> "dummy"),
+ "s1",
+ new Date())
+
+ assertThrows[NoSuchElementException] {
+ scheduler.getDriverPriority(desc)
+ }
+
+ // Test with no specified queue
+ desc = new MesosDriverDescription("d1", "jar", mem, cpu, true,
+ command,
+ Map[String, String](),
+ "s2",
+ new Date())
+
+ assert(scheduler.getDriverPriority(desc) == 0.0f)
+
+ // Test with "default" queue specified
+ desc = new MesosDriverDescription("d1", "jar", mem, cpu, true,
+ command,
+ Map("spark.mesos.dispatcher.queue" -> "default"),
+ "s3",
+ new Date())
+
+ assert(scheduler.getDriverPriority(desc) == 0.0f)
+
+ // Test queue declared in scheduler
+ desc = new MesosDriverDescription("d1", "jar", mem, cpu, true,
+ command,
+ Map("spark.mesos.dispatcher.queue" -> "ROUTINE"),
+ "s4",
+ new Date())
+
+ assert(scheduler.getDriverPriority(desc) == 1.0f)
+
+ // Test other queue declared in scheduler
+ desc = new MesosDriverDescription("d1", "jar", mem, cpu, true,
+ command,
+ Map("spark.mesos.dispatcher.queue" -> "URGENT"),
+ "s5",
new Date())
- val expectedCmd = "cd spark-version*; " +
- "bin/spark-submit --name \"app name\" --master
mesos://mesos://localhost:5050 " +
- "--driver-cores 1.0 --driver-memory 1000M --class Main " +
- "--conf \"another.conf=\\\\value\" " +
- "--conf \"spark.app.name=app name\" " +
- "--conf spark.executor.uri=s3a://bucket/spark-version.tgz " +
- "../jar " +
- "\"--a=\\$2\" " +
- "--b \"x y z\""
-
- assert(scheduler.getDriverCommandValue(driverDesc) == expectedCmd)
+ assert(scheduler.getDriverPriority(desc) == 2.0f)
+ }
+
+ test("Can queue drivers with priority") {
+ val conf = new SparkConf()
+ conf.set("spark.mesos.dispatcher.queue.ROUTINE", "1.0")
+ conf.set("spark.mesos.dispatcher.queue.URGENT", "2.0")
+ conf.set("spark.mesos.dispatcher.queue.EXCEPTIONAL", "3.0")
+ setScheduler(conf.getAll.toMap)
+
+ val mem = 1000
+ val cpu = 1
+
+ val response0 = scheduler.submitDriver(
+ new MesosDriverDescription("d1", "jar", 100, 1, true, command,
+ Map("spark.mesos.dispatcher.queue" -> "ROUTINE"), "s0", new Date()))
+ assert(response0.success)
+
+ val response1 = scheduler.submitDriver(
+ new MesosDriverDescription("d1", "jar", 100, 1, true, command,
+ Map[String, String](), "s1", new Date()))
+ assert(response1.success)
+
+ val response2 = scheduler.submitDriver(
+ new MesosDriverDescription("d1", "jar", 100, 1, true, command,
+ Map("spark.mesos.dispatcher.queue" -> "EXCEPTIONAL"), "s2", new
Date()))
+ assert(response2.success)
+
+ val response3 = scheduler.submitDriver(
+ new MesosDriverDescription("d1", "jar", 100, 1, true, command,
+ Map("spark.mesos.dispatcher.queue" -> "URGENT"), "s3", new Date()))
+ assert(response3.success)
+
+ val state = scheduler.getSchedulerState()
+ val queuedDrivers = state.queuedDrivers.toList
+ assert(queuedDrivers(0).submissionId == response2.submissionId)
+ assert(queuedDrivers(1).submissionId == response3.submissionId)
+ assert(queuedDrivers(2).submissionId == response0.submissionId)
+ assert(queuedDrivers(3).submissionId == response1.submissionId)
+ }
+
+ test("Can queue drivers with negative priority") {
Review comment:
Prefixed :heavy_check_mark:
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]