wbo4958 opened a new pull request, #44690:
URL: https://github.com/apache/spark/pull/44690

   ### What changes were proposed in this pull request?
   
   This (PR) introduces the utilization of fractions instead of slots, which is 
similar to the CPU strategy, 
   for determining whether a worker offer can provide the necessary resources 
to tasks.
   
   For instance, when an executor reports to the driver with [gpu, ["1,", 
"2"]], the driver constructs an executor data map. 
   The keys in the map represent the GPU addresses, and their default values 
are set to 1.0, indicating one whole GPU.
   
   Consequently, the available resource amounts for the executor are as 
follows: { "1" -> 1.0f, "2" -> 1.0f }.
   
   When offering resources to a task that requires 1 CPU and 0.08 GPU, the 
worker offer examines the available resource amounts. 
   It identifies that the capacity of GPU address "1.0" is greater than the 
task's GPU requirement (1.0 >= 0.08). 
   Therefore, Spark assigns the GPU address "1" to this task. After the 
assignment, the available resource amounts 
   for this executor are updated to { "1" -> 0.92, "2" -> 1.0}, ensuring that 
the remaining resources can be allocated to other tasks.
   
   In scenarios where other tasks, using different task resource profiles, 
request varying GPU amounts 
   when dynamic allocation is disabled, Spark applies the same comparison 
approach. It compares the task's GPU requirement with 
   the available resource amounts to determine if the resources can be assigned 
to the task.
   
   ### Why are the changes needed?
   
   The existing resources offering including gpu, fpga is based on "slots per 
address", which is defined by the default resource profile.
   and it's a fixed number for all different resource profiles when dynamic 
allcation is disabled.
   
   Consider the below test case,
   
   ``` scala
     withTempDir { dir =>
       val scriptPath = createTempScriptWithExpectedOutput(dir, 
"gpuDiscoveryScript",
         """{"name": "gpu","addresses":["0"]}""")
   
       val conf = new SparkConf()
         .setAppName("test")
         .setMaster("local-cluster[1, 12, 1024]")
         .set("spark.executor.cores", "12")
         
       conf.set("spark.worker.resource.gpu.amount", "1")
       conf.set("spark.worker.resource.gpu.discoveryScript", scriptPath)
       conf.set("spark.executor.resource.gpu.amount", "1")
       conf.set("spark.task.resource.gpu.amount", "0.08")
       
       sc = new SparkContext(conf)
       val rdd = sc.range(0, 100, 1, 4)
       var rdd1 = rdd.repartition(3)
       val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 1.0)
       val rp = new ResourceProfileBuilder().require(treqs).build
       rdd1 = rdd1.withResources(rp)
       assert(rdd1.collect().size === 100)
     }
   ```
   
   During the initial stages, Spark generates a default resource profile based 
on the configurations. The calculation 
   for determining the slots per GPU address is performed as 
"spark.executor.resource.gpu.amount / spark.task.resource.gpu.amount", 
   resulting in a value of 12 (1/0.08 = 12). This means that Spark can 
accommodate up to 12 tasks running on each GPU address simultaneously.
   
   The job is then divided into two stages. The first stage, which consists of 
4 tasks, runs concurrently based on 
   the default resource profile. However, the second stage, comprising 3 tasks, 
runs sequentially using a new task 
   resource profile. This new profile specifies that each task requires 1 CPU 
and 1.0 full GPU.
   
   In reality, the tasks in the second stage are running in parallel, which is 
the underlying issue.
   
   The problem lies in the line `new 
TaskResourceRequests().cpus(1).resource("gpu", 1.0)`. The value of 1.0 
   for the GPU, or any value below 1.0 (specifically, (0, 0.5] which is rounded 
up to 1.0, spark throws an exception if the value is in (0.5, 1)), 
   is merely requesting the number of slots. In this case, it is requesting 
only 1 slot. Consequently, each task 
   necessitates 1 CPU core and 1 GPU slot, resulting in all tasks running 
simultaneously.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No
   
   
   ### How was this patch tested?
   
   To ensure all tests got passed
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to