wbo4958 commented on PR #44690: URL: https://github.com/apache/spark/pull/44690#issuecomment-1968122629
Hi @srowen, For https://github.com/apache/spark/pull/44690#discussion_r1479110884 I deployed a spark standalone cluster, launched a spark-shell and ran the test code by ``` scala spark-shell --master spark://192.168.0.106:7077 --conf spark.executor.cores=20 --conf spark.task.cpus=1 \ --conf spark.executor.resource.gpu.amount=1 --conf spark.task.resource.gpu.amount=0.05 \ --conf spark.dynamicAllocation.enabled=false 24/02/28 11:08:10 WARN Utils: Your hostname, spark-bobby resolves to a loopback address: 127.0.1.1; using 192.168.0.106 instead (on interface wlp0s20f3) 24/02/28 11:08:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 4.0.0-SNAPSHOT /_/ Using Scala version 2.13.12 (OpenJDK 64-Bit Server VM, Java 17.0.9) Type in expressions to have them evaluated. Type :help for more information. 24/02/28 11:08:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Spark context Web UI available at http://192.168.0.106:4040 Spark context available as 'sc' (master = spark://192.168.0.106:7077, app id = app-20240228110813-0000). Spark session available as 'spark'. scala> import org.apache.spark.TaskContext | import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests} | | val rdd = sc.range(0, 100, 1, 20).mapPartitions { iter => { | val tc = TaskContext.get() | val tid = tc.partitionId() | assert(tc.resources()("gpu").addresses sameElements Array("0")) | iter | }} | | val rdd1 = rdd.repartition(16) | val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 1.0/11.0) | val rp = new ResourceProfileBuilder().require(treqs).build | val rdd2 = rdd1.withResources(rp).mapPartitions { iter => { | val tc = TaskContext.get() | val tid = tc.partitionId() | assert(tc.resources()("gpu").addresses sameElements Array("0")) | iter | } | } | rdd2.collect() warning: 1 deprecation (since 2.13.3); for details, enable `:setting -deprecation` or `:replay -deprecation` 24/02/28 11:09:36 WARN ResourceUtils: The configuration of cores (exec = 20 task = 1, runnable tasks = 20) will result in wasted resources due to resource gpu limiting the number of runnable tasks per executor to: 11. Please adjust your configuration. import org.apache.spark.TaskContext import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests} val rdd: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[2] at mapPartitions at <console>:4 val rdd1: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[6] at repartition at <console>:11 val treqs: org.apache.spark.resource.TaskResourceRequests = Task resource requests: {cpus=name: cpus, amount: 1.0, gpu=name: gpu, amount: 0.09090909090909091} val rp: org.apache.spark.resource.ResourceProfile = Profile: id = 1, executor resources: , task resources: cpus -> name: cpus, amount: 1.0,gpu -> name: gpu, amount: 0.09090909090909091 val rdd2: org.apache.sp... ``` As you can see, I use the 1.0/11.0 for GPU resource `val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 1.0/11.0)` Here are the stages, The first stage, a total of 20 tasks run at the same time  The second stage, a total of 11 tasks run at the same time  It worked as expected -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
