wbo4958 commented on PR #44690:
URL: https://github.com/apache/spark/pull/44690#issuecomment-1888218752

   # Manual test on Spark Yarn Cluster
   
   ## Environment
   
   The internal Spark Yarn environment which supports GPU resources.
   
   ## With dynamic allocation off
   
   ### 1 GPU
   
   - script to discover GPU
   
   create a gpu_discovery_1_gpu.sh 
   
   ``` bash
   cat <<EOF
   {"name": "gpu","addresses":["0"]}
   EOF
   ```
   
   - spark-submit configurations
   
   ```bash
   spark-shell --master yarn \
     --num-executors=1 \
     --conf spark.executor.cores=8 \
     --conf spark.task.cpus=1 \
     --conf spark.executor.resource.gpu.amount=1 \
     --conf spark.task.resource.gpu.amount=0.125 \
     --conf 
spark.executor.resource.gpu.discoveryScript=./gpu_discovery_1_gpu.sh \
     --files `pwd`/gpu_discovery_1_gpu.sh \
     --conf spark.dynamicAllocation.enabled=false
   ```
   
   The aforementioned spark-submit configurations will launch a single executor 
with 8 CPU cores and 1 GPU. The tasks requires 1 CPU core and 0.125 GPUs each, 
allowing for the concurrent execution of 8 tasks.
   
   - test code
   
   ``` scala
   import org.apache.spark.TaskContext
   import org.apache.spark.resource.{ResourceProfileBuilder, 
TaskResourceRequests}
   
   val rdd = sc.range(0, 100, 1, 12).mapPartitions { iter => {
     val tc = TaskContext.get()
     val tid = tc.partitionId()
     assert(tc.resources()("gpu").addresses sameElements Array("0"))
     iter
   }}
   
   val rdd1 = rdd.repartition(2)
   val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.6)
   val rp = new ResourceProfileBuilder().require(treqs).build
   val rdd2 = rdd1.withResources(rp).mapPartitions { iter => {
     val tc = TaskContext.get()
     val tid = tc.partitionId()
     assert(tc.resources()("gpu").addresses sameElements Array("0"))
     iter
   }
   }
   rdd2.collect()
   ```
   
   The provided Spark job will be split into two stages. The first stage 
comprises 12 tasks, each requiring 1 CPU core and 0.125 GPUs. As a result, the 
first 8 tasks can run concurrently, and then run the remaining 4 tasks. 
   
   
![0](https://github.com/apache/spark/assets/1320706/7f0a9827-b0d0-4c40-b6e3-8aeefb9c3280)
   
   
   In contrast, the second stage consists of 2 tasks, each necessitating 1 CPU 
core and 0.6 GPUs. Consequently, only one task will run at any given time, 
while the remaining 2 tasks will execute sequentially.
   
   
![1](https://github.com/apache/spark/assets/1320706/ce9ade15-126f-4228-8791-d113390e2c13)
   
   
   ### 2 GPUs
   
   - script to discover GPU
   
   create a gpu_discovery_2_gpus.sh 
   
   ``` bash
   cat <<EOF
   {"name": "gpu","addresses":["0", "1"]}
   EOF
   ```
   
   - spark-submit configurations
   
   ```bash
   spark-shell --master yarn \
     --num-executors=1 \
     --conf spark.executor.cores=8 \
     --conf spark.task.cpus=1 \
     --conf spark.executor.resource.gpu.amount=2 \
     --conf spark.task.resource.gpu.amount=0.25 \
     --conf 
spark.executor.resource.gpu.discoveryScript=./gpu_discovery_2_gpus.sh \
     --files `pwd`/gpu_discovery_2_gpus.sh \
     --conf spark.dynamicAllocation.enabled=false
   ```
   
   The aforementioned spark-submit configurations will launch a single executor 
with 8 CPU cores and 2 GPU. The tasks requires 1 CPU core and 0.25 GPUs each, 
allowing for the concurrent execution of 8 tasks. the first 4 tasks will grab 
GPU 0, while the remaining 4 tasks grabs the GPU 1 due to the round-robin 
manner when offering the resources.
   
   - test code
   
   ```scala
   import org.apache.spark.TaskContext
   import org.apache.spark.resource.{ResourceProfileBuilder, 
TaskResourceRequests}
   
   val rdd = sc.range(0, 100, 1, 8).mapPartitions { iter => {
     val tc = TaskContext.get()
     val tid = tc.partitionId()
     if (tid >= 4) {
       assert(tc.resources()("gpu").addresses sameElements Array("1"))
     } else {
       assert(tc.resources()("gpu").addresses sameElements Array("0"))
     }
     iter
   }}
   
   val rdd1 = rdd.repartition(2)
   val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.6)
   val rp = new ResourceProfileBuilder().require(treqs).build
   val rdd2 = rdd1.withResources(rp).mapPartitions { iter => {
     val tc = TaskContext.get()
     val tid = tc.partitionId()
     if (tid > 0) {
       assert(tc.resources()("gpu").addresses sameElements Array("1"))
     } else {
       assert(tc.resources()("gpu").addresses sameElements Array("0"))
     }
     iter
   }
   }
   rdd2.collect()
   ```
   
   The provided Spark job will be split into two stages. The first stage 
comprises 8 tasks, each requiring 1 CPU core and 0.25 GPUs. As a result, the 
total 8 tasks can run concurrently. The first 4 tasks will grab GPU 0, while 
the remaining 4 tasks grabs the GPU 1 due to the round-robin manner when 
offering the resources. The assert line can ensure this policy.
    
   
![2](https://github.com/apache/spark/assets/1320706/fb01d0d0-0330-4e9f-80bc-86fa13203dd6)
   
   
   In contrast, the second stage consists of 2 tasks, each necessitating 1 CPU 
core and 0.6 GPUs, since there're 2 GPUs availabe, so the total 2 tasks can run 
concurrently, each grabs 1 different GPU, the assert line can ensure that
   
   
![3](https://github.com/apache/spark/assets/1320706/95ffca92-2deb-41b9-8996-900d6ad77842)
   
   
   ### concurrent spark jobs
   
   This test case is to ensure the other spark job can still grab the left gpu 
resources and run alongside the other spark job.
   
   - script to discover GPU
   
   create a gpu_discovery_2_gpus.sh 
   
   ``` bash
   cat <<EOF
   {"name": "gpu","addresses":["0", "1"]}
   EOF
   ```
   
   - spark-submit configurations
   
   ```bash
   spark-shell --master yarn \
     --num-executors=1 \
     --conf spark.executor.cores=8 \
     --conf spark.task.cpus=1 \
     --conf spark.executor.resource.gpu.amount=2 \
     --conf spark.task.resource.gpu.amount=0.25 \
     --conf 
spark.executor.resource.gpu.discoveryScript=./gpu_discovery_2_gpus.sh \
     --files `pwd`/gpu_discovery_2_gpus.sh \
     --conf spark.dynamicAllocation.enabled=false
   ```
   
   The aforementioned spark-submit configurations will launch a single executor 
with 8 CPU cores and 2 GPU. The tasks requires 1 CPU core and 0.25 GPUs each, 
allowing for the concurrent execution of 8 tasks. the first 4 tasks will grab 
GPU 0, while the remaining 4 tasks grabs the GPU 1 due to the round-robin 
manner when offering the resources.
   
   - test code
   
   ```scala
       import org.apache.spark.TaskContext
       import org.apache.spark.resource.{ResourceProfileBuilder, 
TaskResourceRequests}
   
       // Submit Spark Job 0 in thread1.
       val thread1 = new Thread(() => {
         val rdd = sc.range(0, 8, 1, 8).mapPartitions { iter => {
           val tc = TaskContext.get()
           val tid = tc.partitionId()
           if (tid >= 4) {
             assert(tc.resources()("gpu").addresses sameElements Array("1"))
           } else {
             assert(tc.resources()("gpu").addresses sameElements Array("0"))
           }
           iter
         }
         }
   
         val rdd1 = rdd.repartition(2)
   
         val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.6)
         val rp = new ResourceProfileBuilder().require(treqs).build
   
         val rdd2 = rdd1.withResources(rp).mapPartitions { iter => {
           val tc = TaskContext.get()
           val tid = tc.partitionId()
           assert(tc.resources()("gpu").addresses sameElements 
Array(tid.toString))
           println("sleeping 20s")
           Thread.sleep(20000)
           iter
         }
         }
         rdd2.collect()
       })
   
       thread1.start()
       // sleep 5s in main thread to make sure the spark result tasks launched 
in thread1 are running
       Thread.sleep(5000)
   
       // Submit Spark Job 1 in main thread.
       // Each spark result task in thread1 takes 0.6 gpus, so there is only 
0.4 gpus (for each gpu) left.
       // since the default task gpu amount = 0.25, the concurrent spark tasks 
in Spark Job 1
       // will be 1(0.4/0.25) * 2 (2 gpus)
       val rdd = sc.range(0, 4, 1, 2).mapPartitions(iter => {
         Thread.sleep(10000)
         val tc = TaskContext.get()
         val tid = tc.partitionId()
         if (tid % 2 == 1) {
           assert(tc.resources()("gpu").addresses sameElements Array("1"))
         } else {
           assert(tc.resources()("gpu").addresses sameElements Array("0"))
         }
         iter
       })
       rdd.collect()
   
       thread1.join()
   ```
   
   The given Spark application consists of two spark jobs. The first spark job 
0 is submitted in thread1, while the second spark job 1 is submitted in the 
main thread. To guarantee that the spark job 0 runs prior to the spark job 1, a 
`sleep 5s` is included in the main thread. As a result, the result tasks in 
spark job 0 will pause for 20 seconds to await the completion of spark job 1. 
This is done to test whether spark job 1 can utilize the remaining GPU and 
execute concurrently with spark job 0.
   
   **Event timeline**
   
   
![5](https://github.com/apache/spark/assets/1320706/d3b44a41-8ce2-4b6b-9c45-03f9cf4598e9)
   
   
   **spark job 0**
   
![6_1](https://github.com/apache/spark/assets/1320706/9a4d33e1-6829-416f-8dc5-ae8528364317)
   
   
![6_2](https://github.com/apache/spark/assets/1320706/a6ea193c-9fe6-457b-9e54-38a74f2a45ca)
   
   
   
   **spark job 1**
   
   
![7](https://github.com/apache/spark/assets/1320706/f2ceef9a-760e-463c-b8a4-e6adbb865c46)
   
   
   
   ---
   ---
   
   If we change `val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 
1)` to require 1 each GPU for each task, then the spark job 1 will not grab any 
gpus because the left available GPUs is 0 after spark job is running.
   
   ```scala
       import org.apache.spark.TaskContext
       import org.apache.spark.resource.{ResourceProfileBuilder, 
TaskResourceRequests}
   
       // Submit Spark Job 0 in thread1.
       val thread1 = new Thread(() => {
         val rdd = sc.range(0, 8, 1, 8).mapPartitions { iter => {
           val tc = TaskContext.get()
           val tid = tc.partitionId()
           if (tid >= 4) {
             assert(tc.resources()("gpu").addresses sameElements Array("1"))
           } else {
             assert(tc.resources()("gpu").addresses sameElements Array("0"))
           }
           iter
         }
         }
   
         val rdd1 = rdd.repartition(2)
   
         val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 1)
         val rp = new ResourceProfileBuilder().require(treqs).build
   
         val rdd2 = rdd1.withResources(rp).mapPartitions { iter => {
           val tc = TaskContext.get()
           val tid = tc.partitionId()
           assert(tc.resources()("gpu").addresses sameElements 
Array(tid.toString))
           println("sleeping 20s")
           Thread.sleep(20000)
           iter
         }
         }
         rdd2.collect()
       })
   
       thread1.start()
       // sleep 5s in main thread to make sure the spark result tasks launched 
in thread1 are running
       Thread.sleep(5000)
   
       // Submit Spark Job 1 in main thread.
       // Each spark result task in thread1 takes 1 gpus, so there is no 
available gpus left for spark job 1.
       // The spark job 1 will run after spark job 0 finished, but we can't 
ensure which gpu the task will grab. 
       val rdd = sc.range(0, 4, 1, 2).mapPartitions(iter => {
         Thread.sleep(10000)
         val tc = TaskContext.get()
         assert(tc.resources().contains("gpu"))
         iter
       })
       rdd.collect()
   
       thread1.join()
   ```
   
   
![8](https://github.com/apache/spark/assets/1320706/be721b49-1cb8-4498-ad43-e98feda8d156)
   
   
   From the picture, we can see, spark job 1 was submitted when spark job 0 was 
running, but the tasks on spark job 1 didn't run because of a lack of GPU 
resources. After spark job 0 is finished and releases the GPU, then tasks on 
spark job 1 can grab the GPUs and run.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to