wbo4958 commented on PR #43030:
URL: https://github.com/apache/spark/pull/43030#issuecomment-1736882909

   # Manual tests
   
   Due to the challenges of conducting yarn application tests within Spark unit 
tests, I took the initiative to manually perform several tests on our internal 
Yarn cluster.
   
   ## With dynamic allocation disabled.
   
   ``` bash
   spark-shell --master yarn --num-executors=1 --executor-cores=4 --conf 
spark.task.cpus=1 \
      --conf spark.dynamicAllocation.enabled=false
   ```
   
   The above command requires 1 executor with 4 CPU cores, and the default 
`task.cpus = 1`, so the default tasks parallelism is 4 at a time.
   
   1. `task.cores=1`
   
   Test code:
   
   ``` scala
   import org.apache.spark.resource.{ResourceProfileBuilder, 
TaskResourceRequests}
   
   val rdd = sc.range(0, 100, 1, 4)
   var rdd1 = rdd.repartition(3)
   
   val treqs = new TaskResourceRequests().cpus(1)
   val rp = new ResourceProfileBuilder().require(treqs).build
   
   rdd1 = rdd1.withResources(rp)
   rdd1.collect()
   ```
   
   When the required `task.cpus=1`, `executor.cores=4` (No executor resource 
specified, use the default one), there will be 4 tasks running for rp.
   
   The entire Spark application consists of a single Spark job that will be 
divided into two stages. The first shuffle stage comprises four tasks, all of 
which will be executed simultaneously.
   
   
![shuffle-stage](https://github.com/apache/spark/assets/1320706/19842abb-98b7-4384-8c3a-0c0d77513c70)
   
   And the second ResultStage comprises 3 tasks, and all of which will be 
executed simultaneously since the required `task.cpus` is  1.
   
   ![result-stage-task 
cores=1](https://github.com/apache/spark/assets/1320706/416ca856-90fd-4493-ab25-71acad669d6b)
   
   2. `task.cores=2`
   
   Test code,
   
   ``` scala
   import org.apache.spark.resource.{ResourceProfileBuilder, 
TaskResourceRequests}
   
   val rdd = sc.range(0, 100, 1, 4)
   var rdd1 = rdd.repartition(3)
   
   val treqs = new TaskResourceRequests().cpus(2)
   val rp = new ResourceProfileBuilder().require(treqs).build
   
   rdd1 = rdd1.withResources(rp)
   rdd1.collect()
   ```
   
   When the required `task.cpus=2`, `executor.cores=4` (No executor resource 
specified, use the default one), there will be 2 tasks running for rp.
   
   The first shuffle stage behaves the same as the first one. 
   
   The second ResultStage comprises 3 tasks, so the first 2 tasks will be 
running at a time, and then execute the last task.
   
   ![result-stage-task 
cores=2](https://github.com/apache/spark/assets/1320706/07fd7033-74fb-4653-9c2a-4a04892aea60)
   
   
   3. `task.cores=3`
   
   Test code,
   
   ``` scala
   import org.apache.spark.resource.{ResourceProfileBuilder, 
TaskResourceRequests}
   
   val rdd = sc.range(0, 100, 1, 4)
   var rdd1 = rdd.repartition(3)
   
   val treqs = new TaskResourceRequests().cpus(3)
   val rp = new ResourceProfileBuilder().require(treqs).build
   
   rdd1 = rdd1.withResources(rp)
   rdd1.collect()
   ```
   
   When the required `task.cpus=3`, `executor.cores=4` (No executor resource 
specified, use the default one), there will be 1 task running for rp.
   
   The first shuffle stage behaves the same as the first one. 
   
   The second ResultStage comprises 3 tasks, all of which will be running 
serially.
   
   ![result-stage-task 
cores=3](https://github.com/apache/spark/assets/1320706/bca6b376-4952-4bbb-bce7-e0a5fe2ca836)
   
   4. `task.cores=5`
   
   ``` scalas
   import org.apache.spark.resource.{ResourceProfileBuilder, 
TaskResourceRequests}
   
   val rdd = sc.range(0, 100, 1, 4)
   var rdd1 = rdd.repartition(3)
   val treqs = new TaskResourceRequests().cpus(5)
   val rp = new ResourceProfileBuilder().require(treqs).build
   
   rdd1 = rdd1.withResources(rp)
   ```
   
   exception happened.
   ``` console
   scala> rdd1 = rdd1.withResources(rp)
   org.apache.spark.SparkException: The number of cores per executor (=4) has 
to be >= the number of cpus per task = 5.
     at 
org.apache.spark.resource.ResourceUtils$.validateTaskCpusLargeEnough(ResourceUtils.scala:412)
     at 
org.apache.spark.resource.ResourceProfile.calculateTasksAndLimitingResource(ResourceProfile.scala:182)
     at 
org.apache.spark.resource.ResourceProfile.$anonfun$limitingResource$1(ResourceProfile.scala:152)
     at scala.Option.getOrElse(Option.scala:189)
     at 
org.apache.spark.resource.ResourceProfile.limitingResource(ResourceProfile.scala:151)
     at 
org.apache.spark.resource.ResourceProfileManager.addResourceProfile(ResourceProfileManager.scala:141)
     at org.apache.spark.rdd.RDD.withResources(RDD.scala:1829)
     ... 50 elided
   
   scala> 
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to