wbo4958 commented on PR #44690:
URL: https://github.com/apache/spark/pull/44690#issuecomment-1888221263

   ## With dynamic allocation off
   
   ### 1 GPU
   
   - script to discover GPU
   
   create a gpu_discovery_1_gpu.sh 
   
   ``` bash
   cat <<EOF
   {"name": "gpu","addresses":["0"]}
   EOF
   ```
   
   - spark-submit configurations
   
   ```bash
   spark-shell --master yarn \
     --num-executors=1 \
     --conf spark.executor.cores=4 \
     --conf spark.task.cpus=1 \
     --files `pwd`/gpu_discovery_1_gpu.sh \
     --conf spark.dynamicAllocation.maxExecutors=1 \
     --conf spark.dynamicAllocation.enabled=true
   ```
   
   By utilizing the aforementioned spark-submit configurations, dynamic 
allocation is enabled, resulting in the launch of an initial executor equipped 
with 4 CPU cores. As each task requires 1 CPU core, this setup allows for the 
simultaneous execution of 4 tasks.
   
   - test code
   
   ``` scala
       import org.apache.spark.TaskContext
       import org.apache.spark.resource.{ExecutorResourceRequests, 
ResourceProfileBuilder, TaskResourceRequests}
   
       val rdd = sc.range(0, 100, 1, 6).mapPartitions { iter => {
         val tc = TaskContext.get()
         assert(!tc.resources().contains("gpu"))
         iter
       }
       }
   
       val ereqs = new ExecutorResourceRequests().cores(8).resource("gpu", 1, 
"./gpu_discovery_1_gpu.sh")
       val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.125)
       val rp = new ResourceProfileBuilder().require(ereqs).require(treqs).build
   
       val rdd1 = rdd.repartition(12).withResources(rp).mapPartitions { iter => 
{
         Thread.sleep(1000)
         val tc = TaskContext.get()
         assert(tc.resources()("gpu").addresses sameElements Array("0"))
         iter
       }
       }
       rdd1.collect()
   ```
   
   The provided Spark job will be split into two stages. The first stage 
comprises 6 tasks, each requiring 1 CPU core for the default profile file. As a 
result, the first 4 tasks can run concurrently, and then run the remaining 2 
tasks. 
   
   
   
![0](https://github.com/apache/spark/assets/1320706/ec89cc2e-da14-4b0f-a704-934541d9636f)
   
   
   Within the second stage, there are a total of 12 tasks that demand a 
distinct resource profile. These tasks necessitate executors equipped with 8 
cores and 1 GPU, with each individual task requiring 1 CPU core and 0.125 GPUs. 
As a result, the initial 8 tasks will execute concurrently, followed by the 
subsequent 4 tasks.
   
   
   
![1](https://github.com/apache/spark/assets/1320706/38289c7c-8576-47ed-81fb-4628acd059f1)
   
   
   ### 2 GPUs
   
   - script to discover GPU
   
   create a gpu_discovery_2_gpus.sh 
   
   ``` bash
   cat <<EOF
   {"name": "gpu","addresses":["0", "1"]}
   EOF
   ```
   
   - spark-submit configurations
   
   ```bash
   spark-shell --master yarn \
     --num-executors=1 \
     --conf spark.executor.cores=4 \
     --conf spark.task.cpus=1 \
     --files `pwd`/gpu_discovery_2_gpus.sh \
     --conf spark.dynamicAllocation.maxExecutors=1 \
     --conf spark.dynamicAllocation.enabled=true
   ```
   
   By utilizing the aforementioned spark-submit configurations, dynamic 
allocation is enabled, resulting in the launch of an initial executor equipped 
with 4 CPU cores. As each task requires 1 CPU core, this setup allows for the 
simultaneous execution of 4 tasks.
   
   - test code
   
   ```scala
       import org.apache.spark.TaskContext
       import org.apache.spark.resource.{ExecutorResourceRequests, 
ResourceProfileBuilder, TaskResourceRequests}
   
       val rdd = sc.range(0, 100, 1, 6).mapPartitions { iter => {
         val tc = TaskContext.get()
         assert(!tc.resources().contains("gpu"))
         iter
       }
       }
   
       val ereqs = new ExecutorResourceRequests().cores(8).resource("gpu", 2, 
"./gpu_discovery_2_gpus.sh")
       val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.25)
       val rp = new ResourceProfileBuilder().require(ereqs).require(treqs).build
   
       val rdd1 = rdd.repartition(8).withResources(rp).mapPartitions { iter => {
         Thread.sleep(1000)
         val tc = TaskContext.get()
         val tid = tc.partitionId()
         if (tid >= 4) {
           assert(tc.resources()("gpu").addresses sameElements Array("1"))
         } else {
           assert(tc.resources()("gpu").addresses sameElements Array("0"))
         }
         iter
       }
       }
       rdd1.collect()
   ```
   
   The provided Spark job will be split into two stages. The first stage 
comprises 6 tasks, each requiring 1 CPU core for the default profile file. As a 
result, the first 4 tasks can run concurrently, and the run the remaining 2 
tasks. 
    
   
![2](https://github.com/apache/spark/assets/1320706/411f39be-ad81-44af-8a13-809d3bae40a8)
   
   
   
   Within the second stage, there are a total of 8 tasks that demand a distinct 
resource profile. These tasks necessitate executors equipped with 8 cores and 2 
GPU, with each individual task requiring 1 CPU core and 0.25 GPUs. As a result, 
the total 8 tasks will execute concurrently, the first 4 tasks will grab GPU ID 
0 while the remaining 4 tasks will grab GPU ID 1. The assert line can ensure 
that.
   
   
![3](https://github.com/apache/spark/assets/1320706/ac93a6ef-7928-4ee5-8992-d1ec27ca24ac)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to