wbo4958 commented on PR #43494:
URL: https://github.com/apache/spark/pull/43494#issuecomment-1873931486
# Manual test on Spark Standalone Cluster
## Environment
The Spark Standalone cluster consists of a single worker node equipped with
8 CPU cores but lacks physical GPUs. However, Spark is capable of managing GPU
resources by utilizing GPU IDs instead of actual GPUs. To achieve this, you can
configure the `spark.worker.resource.gpu.discoveryScript` setting with a script
that can retrieve the GPU IDs. For instance,
```bash
cat <<EOF
{"name": "gpu","addresses":["0", "1"]}
EOF
```
So we can change the above script to get 1 GPU/ 2 GPUs or any kind of GPUs.
## with dynamic allocation off
### 1 GPU
- configurations
add below configurations in the `SPARK_HOME/conf/spark-defaults.conf`
``` xml
spark.worker.resource.gpu.amount 1
spark.worker.resource.gpu.discoveryScript /tmp/gpu_discovery.sh
```
- spark-submit configurations
```bash
spark-shell --master spark://192.168.0.103:7077 --conf
spark.executor.cores=8 --conf spark.task.cpus=1 \
--conf spark.executor.resource.gpu.amount=1 --conf
spark.task.resource.gpu.amount=0.125 \
--conf spark.dynamicAllocation.enabled=false
```
The aforementioned spark-submit configurations will launch a single executor
with 8 CPU cores and 1 GPU. The tasks requires 1 CPU core and 0.125 GPUs each,
allowing for the concurrent execution of 8 tasks.
- test code
``` scala
import org.apache.spark.TaskContext
import org.apache.spark.resource.{ResourceProfileBuilder,
TaskResourceRequests}
val rdd = sc.range(0, 100, 1, 12).mapPartitions { iter => {
val tc = TaskContext.get()
val tid = tc.partitionId()
assert(tc.resources()("gpu").addresses sameElements Array("0"))
iter
}}
val rdd1 = rdd.repartition(2)
val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.6)
val rp = new ResourceProfileBuilder().require(treqs).build
val rdd2 = rdd1.withResources(rp).mapPartitions { iter => {
val tc = TaskContext.get()
val tid = tc.partitionId()
assert(tc.resources()("gpu").addresses sameElements Array("0"))
iter
}
}
rdd2.collect()
```
The provided Spark job will be split into two stages. The first stage
comprises 12 tasks, each requiring 1 CPU core and 0.125 GPUs. As a result, the
first 8 tasks can run concurrently, and then run the remaining 4 tasks.

In contrast, the second stage consists of 2 tasks, each necessitating 1 CPU
core and 0.6 GPUs. Consequently, only one task will run at any given time,
while the remaining 2 tasks will execute sequentially.

### 2 GPUs
- configurations
add below configurations in the `SPARK_HOME/conf/spark-defaults.conf`
``` xml
spark.worker.resource.gpu.amount 2
spark.worker.resource.gpu.discoveryScript /tmp/gpu_discovery.sh
```
- spark-submit configurations
```bash
spark-shell --master spark://192.168.0.103:7077 --conf
spark.executor.cores=8 --conf spark.task.cpus=1 \
--conf spark.executor.resource.gpu.amount=2 --conf
spark.task.resource.gpu.amount=0.25 \
--conf spark.dynamicAllocation.enabled=false
```
The aforementioned spark-submit configurations will launch a single executor
with 8 CPU cores and 2 GPU. The tasks requires 1 CPU core and 0.25 GPUs each,
allowing for the concurrent execution of 8 tasks. the first 4 tasks will grab
GPU 0, while the remaining 4 tasks grabs the GPU 1 due to the round-robin
manner when offering the resources.
- test code
```scala
import org.apache.spark.TaskContext
import org.apache.spark.resource.{ResourceProfileBuilder,
TaskResourceRequests}
val rdd = sc.range(0, 100, 1, 8).mapPartitions { iter => {
val tc = TaskContext.get()
val tid = tc.partitionId()
if (tid >= 4) {
assert(tc.resources()("gpu").addresses sameElements Array("1"))
} else {
assert(tc.resources()("gpu").addresses sameElements Array("0"))
}
iter
}}
val rdd1 = rdd.repartition(2)
val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.6)
val rp = new ResourceProfileBuilder().require(treqs).build
val rdd2 = rdd1.withResources(rp).mapPartitions { iter => {
val tc = TaskContext.get()
val tid = tc.partitionId()
if (tid > 0) {
assert(tc.resources()("gpu").addresses sameElements Array("1"))
} else {
assert(tc.resources()("gpu").addresses sameElements Array("0"))
}
iter
}
}
rdd2.collect()
```
The provided Spark job will be split into two stages. The first stage
comprises 8 tasks, each requiring 1 CPU core and 0.25 GPUs. As a result, the
total 8 tasks can run concurrently. The first 4 tasks will grab GPU 0, while
the remaining 4 tasks grabs the GPU 1 due to the round-robin manner when
offering the resources. The assert line can ensure this policy.

In contrast, the second stage consists of 2 tasks, each necessitating 1 CPU
core and 0.6 GPUs, since there're 2 GPUs availabe, so the total 2 tasks can run
concurrently, each grabs 1 different GPU, the assert line can ensure that.

### concurrent spark jobs
This test case is to ensure the other spark job can still grab the left gpu
resources and run alongside the other spark job.
- configurations
add below configurations in the `SPARK_HOME/conf/spark-defaults.conf`
``` xml
spark.worker.resource.gpu.amount 2
spark.worker.resource.gpu.discoveryScript /tmp/gpu_discovery.sh
```
- spark-submit configurations
```bash
spark-shell --master spark://192.168.0.103:7077 --conf
spark.executor.cores=8 --conf spark.task.cpus=1 \
--conf spark.executor.resource.gpu.amount=2 --conf
spark.task.resource.gpu.amount=0.25 \
--conf spark.dynamicAllocation.enabled=false
```
The aforementioned spark-submit configurations will launch a single executor
with 8 CPU cores and 2 GPU. The tasks requires 1 CPU core and 0.25 GPUs each,
allowing for the concurrent execution of 8 tasks. the first 4 tasks will grab
GPU 0, while the remaining 4 tasks grabs the GPU 1 due to the round-robin
manner when offering the resources.
- test code
```scala
import org.apache.spark.TaskContext
import org.apache.spark.resource.{ResourceProfileBuilder,
TaskResourceRequests}
// Submit Spark Job 0 in thread1.
val thread1 = new Thread(() => {
val rdd = sc.range(0, 8, 1, 8).mapPartitions { iter => {
val tc = TaskContext.get()
val tid = tc.partitionId()
if (tid >= 4) {
assert(tc.resources()("gpu").addresses sameElements Array("1"))
} else {
assert(tc.resources()("gpu").addresses sameElements Array("0"))
}
iter
}
}
val rdd1 = rdd.repartition(2)
val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.6)
val rp = new ResourceProfileBuilder().require(treqs).build
val rdd2 = rdd1.withResources(rp).mapPartitions { iter => {
val tc = TaskContext.get()
val tid = tc.partitionId()
assert(tc.resources()("gpu").addresses sameElements
Array(tid.toString))
println("sleeping 20s")
Thread.sleep(20000)
iter
}
}
rdd2.collect()
})
thread1.start()
// sleep 5s in main thread to make sure the spark result tasks launched
in thread1 are running
Thread.sleep(5000)
// Submit Spark Job 1 in main thread.
// Each spark result task in thread1 takes 0.6 gpus, so there is only
0.4 gpus (for each gpu) left.
// since the default task gpu amount = 0.25, the concurrent spark tasks
in Spark Job 1
// will be 1(0.4/0.25) * 2 (2 gpus)
val rdd = sc.range(0, 4, 1, 2).mapPartitions(iter => {
Thread.sleep(10000)
val tc = TaskContext.get()
val tid = tc.partitionId()
if (tid % 2 == 1) {
assert(tc.resources()("gpu").addresses sameElements Array("1"))
} else {
assert(tc.resources()("gpu").addresses sameElements Array("0"))
}
iter
})
rdd.collect()
thread1.join()
```
The given Spark application consists of two spark jobs. The first spark job
0 is submitted in thread1, while the second spark job 1 is submitted in the
main thread. To guarantee that the spark job 0 runs prior to the spark job 1, a
`sleep 5s` is included in the main thread. As a result, the result tasks in
spark job 0 will pause for 20 seconds to await the completion of spark job 1.
This is done to test whether spark job 1 can utilize the remaining GPU and
execute concurrently with spark job 0.
**Event timeline**

from the picture, we can see, the Spark Job 1 was running alongside Spark
Job 0 and finished before spark job 0.
**spark job 0**


**spark job 1**

---
If we change `val treqs = new TaskResourceRequests().cpus(1).resource("gpu",
1)` to require 1 each GPU for each task, then the spark job 1 will not grab any
gpus because the left available GPUs is 0 after spark job is running.
```scala
import org.apache.spark.TaskContext
import org.apache.spark.resource.{ResourceProfileBuilder,
TaskResourceRequests}
// Submit Spark Job 0 in thread1.
val thread1 = new Thread(() => {
val rdd = sc.range(0, 8, 1, 8).mapPartitions { iter => {
val tc = TaskContext.get()
val tid = tc.partitionId()
if (tid >= 4) {
assert(tc.resources()("gpu").addresses sameElements Array("1"))
} else {
assert(tc.resources()("gpu").addresses sameElements Array("0"))
}
iter
}
}
val rdd1 = rdd.repartition(2)
val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 1)
val rp = new ResourceProfileBuilder().require(treqs).build
val rdd2 = rdd1.withResources(rp).mapPartitions { iter => {
val tc = TaskContext.get()
val tid = tc.partitionId()
assert(tc.resources()("gpu").addresses sameElements
Array(tid.toString))
println("sleeping 20s")
Thread.sleep(20000)
iter
}
}
rdd2.collect()
})
thread1.start()
// sleep 5s in main thread to make sure the spark result tasks launched
in thread1 are running
Thread.sleep(5000)
// Submit Spark Job 1 in main thread.
// Each spark result task in thread1 takes 1 gpus, so there is no
available gpus left for spark job 1.
// The spark job 1 will run after spark job 0 finished, but we can't
ensure which gpu the task will grab.
val rdd = sc.range(0, 4, 1, 2).mapPartitions(iter => {
Thread.sleep(10000)
val tc = TaskContext.get()
assert(tc.resources().contains("gpu"))
iter
})
rdd.collect()
thread1.join()
```

From the picture, we can see, spark job 1 was submitted when spark job 0
was running, but the tasks on spark job 1 didn't run because of a lack of GPU
resources. After spark job 0 is finished and releases the GPU, then tasks on
spark job 1 can grab the GPUs and run.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]