wbo4958 commented on PR #43323: URL: https://github.com/apache/spark/pull/43323#issuecomment-1756553207
# Manual tests Due to the challenges of conducting Kubernetes application tests within Spark unit tests, I took the initiative to manually perform several tests on the local Kubernetes cluster. I followed https://jaceklaskowski.github.io/spark-kubernetes-book/demo/spark-shell-on-minikube/ tutorial to setup a local Kubernetes cluster. ``` bash minikube delete minikube start eval $(minikube -p minikube docker-env) cd $SPARK_HOME ./bin/docker-image-tool.sh \ -m \ -t pr_k8s \ build eval $(minikube -p minikube docker-env) kubectl create ns spark-demo kubens spark-demo cd $SPARK_HOME K8S_SERVER=$(kubectl config view --output=jsonpath='{.clusters[].cluster.server}') ``` ## With dynamic allocation disabled. ``` bash ./bin/spark-shell --master k8s://$K8S_SERVER \ --conf spark.kubernetes.container.image=spark:pr_k8s \ --conf spark.kubernetes.context=minikube \ --conf spark.kubernetes.namespace=spark-demo \ --verbose \ --num-executors=1 \ --conf spark.executor.cores=4 \ --conf spark.task.cpus=1 \ --conf spark.dynamicAllocation.enabled=fasle ``` The above command requires 1 executor with 4 CPU cores, and the default `task.cpus = 1`, so the default tasks parallelism is 4 at a time. 1. `task.cores=1` Test code: ``` scala import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests} val rdd = sc.range(0, 100, 1, 4) var rdd1 = rdd.repartition(3) val treqs = new TaskResourceRequests().cpus(1) val rp = new ResourceProfileBuilder().require(treqs).build rdd1 = rdd1.withResources(rp) rdd1.collect() ``` When the required `task.cpus=1`, `executor.cores=4` (No executor resource specified, use the default one), there will be 4 tasks running for rp. The entire Spark application consists of a single Spark job that will be divided into two stages. The first shuffle stage comprises four tasks, all of which will be executed simultaneously. And the second ResultStage comprises 3 tasks, and all of which will be executed simultaneously since the required `task.cpus` is 1.  2. `task.cores=2` Test code, ``` scala import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests} val rdd = sc.range(0, 100, 1, 4) var rdd1 = rdd.repartition(3) val treqs = new TaskResourceRequests().cpus(2) val rp = new ResourceProfileBuilder().require(treqs).build rdd1 = rdd1.withResources(rp) rdd1.collect() ``` When the required `task.cpus=2`, `executor.cores=4` (No executor resource specified, use the default one), there will be 2 tasks running for rp. The first shuffle stage behaves the same as the first one. The second ResultStage comprises 3 tasks, so the first 2 tasks will be running at a time, and then execute the last task.  3. `task.cores=3` Test code, ``` scala import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests} val rdd = sc.range(0, 100, 1, 4) var rdd1 = rdd.repartition(3) val treqs = new TaskResourceRequests().cpus(3) val rp = new ResourceProfileBuilder().require(treqs).build rdd1 = rdd1.withResources(rp) rdd1.collect() ``` When the required `task.cpus=3`, `executor.cores=4` (No executor resource specified, use the default one), there will be 1 task running for rp. The first shuffle stage behaves the same as the first one. The second ResultStage comprises 3 tasks, all of which will be running serially.  4. `task.cores=5` exception happened. ``` scalas import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests} val rdd = sc.range(0, 100, 1, 4) var rdd1 = rdd.repartition(3) val treqs = new TaskResourceRequests().cpus(5) val rp = new ResourceProfileBuilder().require(treqs).build rdd1 = rdd1.withResources(rp) rdd1.collect() ``` ``` console scala> import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests} | | val rdd = sc.range(0, 100, 1, 4) | var rdd1 = rdd.repartition(3) | val treqs = new TaskResourceRequests().cpus(5) | val rp = new ResourceProfileBuilder().require(treqs).build | | rdd1 = rdd1.withResources(rp) | | rdd1.collect() warning: 1 deprecation (since 2.13.3); for details, enable `:setting -deprecation` or `:replay -deprecation` org.apache.spark.SparkException: The number of cores per executor (=4) has to be >= the number of cpus per task = 5. at org.apache.spark.resource.ResourceUtils$.validateTaskCpusLargeEnough(ResourceUtils.scala:412) at org.apache.spark.resource.ResourceProfile.calculateTasksAndLimitingResource(ResourceProfile.scala:182) at org.apache.spark.resource.ResourceProfile.$anonfun$limitingResource$1(ResourceProfile.scala:152) at scala.Option.getOrElse(Option.scala:201) at org.apache.spark.resource.ResourceProfile.limitingResource(ResourceProfile.scala:151) at org.apache.spark.resource.ResourceProfileManager.addResourceProfile(ResourceProfileManager.scala:141) at org.apache.spark.rdd.RDD.withResources(RDD.scala:1829) ... 42 elided ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
