tgravescs commented on a change in pull request #25047:
[WIP][SPARK-27371][CORE] Support GPU-aware resources scheduling in Standalone
URL: https://github.com/apache/spark/pull/25047#discussion_r311067074
##########
File path: core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
##########
@@ -218,6 +243,105 @@ class WorkerSuite extends SparkFunSuite with Matchers
with BeforeAndAfter {
}
}
+ test("worker could be launched without any resources") {
+ val worker = makeWorker()
+ worker.rpcEnv.setupEndpoint("worker", worker)
+ eventually(timeout(10.seconds)) {
+ assert(worker.resources === Map.empty)
+ worker.rpcEnv.shutdown()
+ worker.rpcEnv.awaitTermination()
+ }
+ assertResourcesFileDeleted()
+ }
+
+ test("worker could load resources from resources file while launching") {
+ val conf = new SparkConf()
+ withTempDir { dir =>
+ val gpuArgs = ResourceAllocation(WORKER_GPU_ID, Seq("0", "1"))
+ val fpgaArgs =
+ ResourceAllocation(WORKER_FPGA_ID, Seq("f1", "f2", "f3"))
+ val ja = Extraction.decompose(Seq(gpuArgs, fpgaArgs))
+ val f1 = createTempJsonFile(dir, "resources", ja)
+ conf.set(SPARK_WORKER_RESOURCE_FILE.key, f1)
+ conf.set(WORKER_GPU_ID.amountConf, "2")
+ conf.set(WORKER_FPGA_ID.amountConf, "3")
+ val worker = makeWorker(conf)
+ worker.rpcEnv.setupEndpoint("worker", worker)
+ eventually(timeout(10.seconds)) {
+ assert(worker.resources === Map(GPU -> gpuArgs.toResourceInformation,
+ FPGA -> fpgaArgs.toResourceInformation))
+ worker.rpcEnv.shutdown()
+ worker.rpcEnv.awaitTermination()
+ }
+ assertResourcesFileDeleted()
+ }
+ }
+
+ test("worker could load resources from discovery script while launching") {
+ val conf = new SparkConf()
+ withTempDir { dir =>
+ val scriptPath = createTempScriptWithExpectedOutput(dir,
"fpgaDiscoverScript",
+ """{"name": "fpga","addresses":["f1", "f2", "f3"]}""")
+ conf.set(WORKER_FPGA_ID.discoveryScriptConf, scriptPath)
+ conf.set(WORKER_FPGA_ID.amountConf, "3")
+ val worker = makeWorker(conf)
+ worker.rpcEnv.setupEndpoint("worker", worker)
+ eventually(timeout(10.seconds)) {
+ assert(worker.resources === Map(FPGA ->
+ new ResourceInformation(FPGA, Array("f1", "f2", "f3"))))
+ worker.rpcEnv.shutdown()
+ worker.rpcEnv.awaitTermination()
+ }
+ assertResourcesFileDeleted()
+ }
+ }
+
+ test("worker could load resources from resources file and discovery script
while launching") {
+ val conf = new SparkConf()
+ withTempDir { dir =>
+ val gpuArgs = ResourceAllocation(WORKER_GPU_ID, Seq("0", "1"))
+ val ja = Extraction.decompose(Seq(gpuArgs))
+ val resourcesPath = createTempJsonFile(dir, "resources", ja)
+ val scriptPath = createTempScriptWithExpectedOutput(dir,
"fpgaDiscoverScript",
+ """{"name": "fpga","addresses":["f1", "f2", "f3"]}""")
+ conf.set(SPARK_WORKER_RESOURCE_FILE.key, resourcesPath)
+ conf.set(WORKER_FPGA_ID.discoveryScriptConf, scriptPath)
+ conf.set(WORKER_FPGA_ID.amountConf, "3")
+ conf.set(WORKER_GPU_ID.amountConf, "2")
+ val worker = makeWorker(conf)
+ worker.rpcEnv.setupEndpoint("worker", worker)
+ eventually(timeout(10.seconds)) {
+ assert(worker.resources === Map(GPU -> gpuArgs.toResourceInformation,
+ FPGA -> new ResourceInformation(FPGA, Array("f1", "f2", "f3"))))
+ worker.rpcEnv.shutdown()
+ worker.rpcEnv.awaitTermination()
+ }
+ assertResourcesFileDeleted()
+ }
+ }
+
+ test("Workers should avoid resources conflict when launch from the same
host") {
Review comment:
would be nice to add a test with the SPARK_RESOURCES_COORDINATE off to make
sure all the resources from file/discovery returned properly
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]