wbo4958 commented on PR #44690:
URL: https://github.com/apache/spark/pull/44690#issuecomment-1888218752
# Manual test on Spark Yarn Cluster
## Environment
The internal Spark Yarn environment which supports GPU resources.
## With dynamic allocation off
### 1 GPU
- script to discover GPU
create a gpu_discovery_1_gpu.sh
``` bash
cat <<EOF
{"name": "gpu","addresses":["0"]}
EOF
```
- spark-submit configurations
```bash
spark-shell --master yarn \
--num-executors=1 \
--conf spark.executor.cores=8 \
--conf spark.task.cpus=1 \
--conf spark.executor.resource.gpu.amount=1 \
--conf spark.task.resource.gpu.amount=0.125 \
--conf
spark.executor.resource.gpu.discoveryScript=./gpu_discovery_1_gpu.sh \
--files `pwd`/gpu_discovery_1_gpu.sh \
--conf spark.dynamicAllocation.enabled=false
```
The aforementioned spark-submit configurations will launch a single executor
with 8 CPU cores and 1 GPU. The tasks requires 1 CPU core and 0.125 GPUs each,
allowing for the concurrent execution of 8 tasks.
- test code
``` scala
import org.apache.spark.TaskContext
import org.apache.spark.resource.{ResourceProfileBuilder,
TaskResourceRequests}
val rdd = sc.range(0, 100, 1, 12).mapPartitions { iter => {
val tc = TaskContext.get()
val tid = tc.partitionId()
assert(tc.resources()("gpu").addresses sameElements Array("0"))
iter
}}
val rdd1 = rdd.repartition(2)
val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.6)
val rp = new ResourceProfileBuilder().require(treqs).build
val rdd2 = rdd1.withResources(rp).mapPartitions { iter => {
val tc = TaskContext.get()
val tid = tc.partitionId()
assert(tc.resources()("gpu").addresses sameElements Array("0"))
iter
}
}
rdd2.collect()
```
The provided Spark job will be split into two stages. The first stage
comprises 12 tasks, each requiring 1 CPU core and 0.125 GPUs. As a result, the
first 8 tasks can run concurrently, and then run the remaining 4 tasks.

In contrast, the second stage consists of 2 tasks, each necessitating 1 CPU
core and 0.6 GPUs. Consequently, only one task will run at any given time,
while the remaining 2 tasks will execute sequentially.

### 2 GPUs
- script to discover GPU
create a gpu_discovery_2_gpus.sh
``` bash
cat <<EOF
{"name": "gpu","addresses":["0", "1"]}
EOF
```
- spark-submit configurations
```bash
spark-shell --master yarn \
--num-executors=1 \
--conf spark.executor.cores=8 \
--conf spark.task.cpus=1 \
--conf spark.executor.resource.gpu.amount=2 \
--conf spark.task.resource.gpu.amount=0.25 \
--conf
spark.executor.resource.gpu.discoveryScript=./gpu_discovery_2_gpus.sh \
--files `pwd`/gpu_discovery_2_gpus.sh \
--conf spark.dynamicAllocation.enabled=false
```
The aforementioned spark-submit configurations will launch a single executor
with 8 CPU cores and 2 GPU. The tasks requires 1 CPU core and 0.25 GPUs each,
allowing for the concurrent execution of 8 tasks. the first 4 tasks will grab
GPU 0, while the remaining 4 tasks grabs the GPU 1 due to the round-robin
manner when offering the resources.
- test code
```scala
import org.apache.spark.TaskContext
import org.apache.spark.resource.{ResourceProfileBuilder,
TaskResourceRequests}
val rdd = sc.range(0, 100, 1, 8).mapPartitions { iter => {
val tc = TaskContext.get()
val tid = tc.partitionId()
if (tid >= 4) {
assert(tc.resources()("gpu").addresses sameElements Array("1"))
} else {
assert(tc.resources()("gpu").addresses sameElements Array("0"))
}
iter
}}
val rdd1 = rdd.repartition(2)
val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.6)
val rp = new ResourceProfileBuilder().require(treqs).build
val rdd2 = rdd1.withResources(rp).mapPartitions { iter => {
val tc = TaskContext.get()
val tid = tc.partitionId()
if (tid > 0) {
assert(tc.resources()("gpu").addresses sameElements Array("1"))
} else {
assert(tc.resources()("gpu").addresses sameElements Array("0"))
}
iter
}
}
rdd2.collect()
```
The provided Spark job will be split into two stages. The first stage
comprises 8 tasks, each requiring 1 CPU core and 0.25 GPUs. As a result, the
total 8 tasks can run concurrently. The first 4 tasks will grab GPU 0, while
the remaining 4 tasks grabs the GPU 1 due to the round-robin manner when
offering the resources. The assert line can ensure this policy.

In contrast, the second stage consists of 2 tasks, each necessitating 1 CPU
core and 0.6 GPUs, since there're 2 GPUs availabe, so the total 2 tasks can run
concurrently, each grabs 1 different GPU, the assert line can ensure that

### concurrent spark jobs
This test case is to ensure the other spark job can still grab the left gpu
resources and run alongside the other spark job.
- script to discover GPU
create a gpu_discovery_2_gpus.sh
``` bash
cat <<EOF
{"name": "gpu","addresses":["0", "1"]}
EOF
```
- spark-submit configurations
```bash
spark-shell --master yarn \
--num-executors=1 \
--conf spark.executor.cores=8 \
--conf spark.task.cpus=1 \
--conf spark.executor.resource.gpu.amount=2 \
--conf spark.task.resource.gpu.amount=0.25 \
--conf
spark.executor.resource.gpu.discoveryScript=./gpu_discovery_2_gpus.sh \
--files `pwd`/gpu_discovery_2_gpus.sh \
--conf spark.dynamicAllocation.enabled=false
```
The aforementioned spark-submit configurations will launch a single executor
with 8 CPU cores and 2 GPU. The tasks requires 1 CPU core and 0.25 GPUs each,
allowing for the concurrent execution of 8 tasks. the first 4 tasks will grab
GPU 0, while the remaining 4 tasks grabs the GPU 1 due to the round-robin
manner when offering the resources.
- test code
```scala
import org.apache.spark.TaskContext
import org.apache.spark.resource.{ResourceProfileBuilder,
TaskResourceRequests}
// Submit Spark Job 0 in thread1.
val thread1 = new Thread(() => {
val rdd = sc.range(0, 8, 1, 8).mapPartitions { iter => {
val tc = TaskContext.get()
val tid = tc.partitionId()
if (tid >= 4) {
assert(tc.resources()("gpu").addresses sameElements Array("1"))
} else {
assert(tc.resources()("gpu").addresses sameElements Array("0"))
}
iter
}
}
val rdd1 = rdd.repartition(2)
val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.6)
val rp = new ResourceProfileBuilder().require(treqs).build
val rdd2 = rdd1.withResources(rp).mapPartitions { iter => {
val tc = TaskContext.get()
val tid = tc.partitionId()
assert(tc.resources()("gpu").addresses sameElements
Array(tid.toString))
println("sleeping 20s")
Thread.sleep(20000)
iter
}
}
rdd2.collect()
})
thread1.start()
// sleep 5s in main thread to make sure the spark result tasks launched
in thread1 are running
Thread.sleep(5000)
// Submit Spark Job 1 in main thread.
// Each spark result task in thread1 takes 0.6 gpus, so there is only
0.4 gpus (for each gpu) left.
// since the default task gpu amount = 0.25, the concurrent spark tasks
in Spark Job 1
// will be 1(0.4/0.25) * 2 (2 gpus)
val rdd = sc.range(0, 4, 1, 2).mapPartitions(iter => {
Thread.sleep(10000)
val tc = TaskContext.get()
val tid = tc.partitionId()
if (tid % 2 == 1) {
assert(tc.resources()("gpu").addresses sameElements Array("1"))
} else {
assert(tc.resources()("gpu").addresses sameElements Array("0"))
}
iter
})
rdd.collect()
thread1.join()
```
The given Spark application consists of two spark jobs. The first spark job
0 is submitted in thread1, while the second spark job 1 is submitted in the
main thread. To guarantee that the spark job 0 runs prior to the spark job 1, a
`sleep 5s` is included in the main thread. As a result, the result tasks in
spark job 0 will pause for 20 seconds to await the completion of spark job 1.
This is done to test whether spark job 1 can utilize the remaining GPU and
execute concurrently with spark job 0.
**Event timeline**

**spark job 0**


**spark job 1**

---
---
If we change `val treqs = new TaskResourceRequests().cpus(1).resource("gpu",
1)` to require 1 each GPU for each task, then the spark job 1 will not grab any
gpus because the left available GPUs is 0 after spark job is running.
```scala
import org.apache.spark.TaskContext
import org.apache.spark.resource.{ResourceProfileBuilder,
TaskResourceRequests}
// Submit Spark Job 0 in thread1.
val thread1 = new Thread(() => {
val rdd = sc.range(0, 8, 1, 8).mapPartitions { iter => {
val tc = TaskContext.get()
val tid = tc.partitionId()
if (tid >= 4) {
assert(tc.resources()("gpu").addresses sameElements Array("1"))
} else {
assert(tc.resources()("gpu").addresses sameElements Array("0"))
}
iter
}
}
val rdd1 = rdd.repartition(2)
val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 1)
val rp = new ResourceProfileBuilder().require(treqs).build
val rdd2 = rdd1.withResources(rp).mapPartitions { iter => {
val tc = TaskContext.get()
val tid = tc.partitionId()
assert(tc.resources()("gpu").addresses sameElements
Array(tid.toString))
println("sleeping 20s")
Thread.sleep(20000)
iter
}
}
rdd2.collect()
})
thread1.start()
// sleep 5s in main thread to make sure the spark result tasks launched
in thread1 are running
Thread.sleep(5000)
// Submit Spark Job 1 in main thread.
// Each spark result task in thread1 takes 1 gpus, so there is no
available gpus left for spark job 1.
// The spark job 1 will run after spark job 0 finished, but we can't
ensure which gpu the task will grab.
val rdd = sc.range(0, 4, 1, 2).mapPartitions(iter => {
Thread.sleep(10000)
val tc = TaskContext.get()
assert(tc.resources().contains("gpu"))
iter
})
rdd.collect()
thread1.join()
```

From the picture, we can see, spark job 1 was submitted when spark job 0 was
running, but the tasks on spark job 1 didn't run because of a lack of GPU
resources. After spark job 0 is finished and releases the GPU, then tasks on
spark job 1 can grab the GPUs and run.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]