[GitHub] spark pull request #21977: [SPARK-25004][CORE] Add spark.executor.pyspark.me...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21977#discussion_r213882447 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -62,14 +63,20 @@ private[spark] object PythonEvalType { */ private[spark] abstract class BasePythonRunner[IN, OUT]( funcs: Seq[ChainedPythonFunctions], -bufferSize: Int, -reuseWorker: Boolean, evalType: Int, argOffsets: Array[Array[Int]]) extends Logging { require(funcs.length == argOffsets.length, "argOffsets should have the same length as funcs") + private val conf = SparkEnv.get.conf + private val bufferSize = conf.getInt("spark.buffer.size", 65536) + private val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true) + // each python worker gets an equal part of the allocation. the worker pool will grow to the + // number of concurrent tasks, which is determined by the number of cores in this executor. + private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY) + .map(_ / conf.getInt("spark.executor.cores", 1)) --- End diff -- Oh, it's fine. I meant to fix them together if there are more changes to push. Not a big deal. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21977: [SPARK-25004][CORE] Add spark.executor.pyspark.me...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21977#discussion_r213777162 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -62,14 +63,20 @@ private[spark] object PythonEvalType { */ private[spark] abstract class BasePythonRunner[IN, OUT]( funcs: Seq[ChainedPythonFunctions], -bufferSize: Int, -reuseWorker: Boolean, evalType: Int, argOffsets: Array[Array[Int]]) extends Logging { require(funcs.length == argOffsets.length, "argOffsets should have the same length as funcs") + private val conf = SparkEnv.get.conf + private val bufferSize = conf.getInt("spark.buffer.size", 65536) + private val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true) + // each python worker gets an equal part of the allocation. the worker pool will grow to the + // number of concurrent tasks, which is determined by the number of cores in this executor. + private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY) + .map(_ / conf.getInt("spark.executor.cores", 1)) --- End diff -- @HyukjinKwon, sorry but it looks like this was merged before I could push a commit to update it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21977: [SPARK-25004][CORE] Add spark.executor.pyspark.me...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21977 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21977: [SPARK-25004][CORE] Add spark.executor.pyspark.me...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21977#discussion_r213407352 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -62,14 +63,20 @@ private[spark] object PythonEvalType { */ private[spark] abstract class BasePythonRunner[IN, OUT]( funcs: Seq[ChainedPythonFunctions], -bufferSize: Int, -reuseWorker: Boolean, evalType: Int, argOffsets: Array[Array[Int]]) extends Logging { require(funcs.length == argOffsets.length, "argOffsets should have the same length as funcs") + private val conf = SparkEnv.get.conf + private val bufferSize = conf.getInt("spark.buffer.size", 65536) + private val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true) + // each python worker gets an equal part of the allocation. the worker pool will grow to the + // number of concurrent tasks, which is determined by the number of cores in this executor. + private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY) + .map(_ / conf.getInt("spark.executor.cores", 1)) --- End diff -- Sure, thanks for taking the time to clarify it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21977: [SPARK-25004][CORE] Add spark.executor.pyspark.me...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21977#discussion_r213192352 --- Diff: resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala --- @@ -161,6 +162,11 @@ abstract class BaseYarnClusterSuite } extraJars.foreach(launcher.addJar) +if (outFile.isDefined) { --- End diff -- To me, either way is fine. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21977: [SPARK-25004][CORE] Add spark.executor.pyspark.me...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21977#discussion_r213192210 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -62,14 +63,20 @@ private[spark] object PythonEvalType { */ private[spark] abstract class BasePythonRunner[IN, OUT]( funcs: Seq[ChainedPythonFunctions], -bufferSize: Int, -reuseWorker: Boolean, evalType: Int, argOffsets: Array[Array[Int]]) extends Logging { require(funcs.length == argOffsets.length, "argOffsets should have the same length as funcs") + private val conf = SparkEnv.get.conf + private val bufferSize = conf.getInt("spark.buffer.size", 65536) + private val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true) + // each python worker gets an equal part of the allocation. the worker pool will grow to the + // number of concurrent tasks, which is determined by the number of cores in this executor. + private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY) + .map(_ / conf.getInt("spark.executor.cores", 1)) --- End diff -- @rdblue, I fixed the site to refer databricks's guide. mind fixing this one if there are more changes to be pushed? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21977: [SPARK-25004][CORE] Add spark.executor.pyspark.me...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/21977#discussion_r213187429 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala --- @@ -91,6 +91,13 @@ private[spark] class Client( private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse( math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt + private val isPython = sparkConf.get(IS_PYTHON_APP) --- End diff -- Sure, one of them is https://github.com/sparklingpandas/sparklingml --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21977: [SPARK-25004][CORE] Add spark.executor.pyspark.me...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/21977#discussion_r213186832 --- Diff: resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala --- @@ -161,6 +162,11 @@ abstract class BaseYarnClusterSuite } extraJars.foreach(launcher.addJar) +if (outFile.isDefined) { --- End diff -- I think the pattern match would be better than the get. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21977: [SPARK-25004][CORE] Add spark.executor.pyspark.me...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21977#discussion_r213122284 --- Diff: docs/configuration.md --- @@ -179,6 +179,15 @@ of the most common options to set are: (e.g. 2g, 8g). + + spark.executor.pyspark.memory + Not set + +The amount of memory to be allocated to PySpark in each executor, in MiB +unless otherwise specified. If set, PySpark memory for an executor will be +limited to this amount. If not set, Spark will not limit Python's memory use. --- End diff -- I've added "and it is up to the application to avoid exceeding the overhead memory space shared with other non-JVM processes." --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21977: [SPARK-25004][CORE] Add spark.executor.pyspark.me...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21977#discussion_r213121178 --- Diff: docs/configuration.md --- @@ -179,6 +179,15 @@ of the most common options to set are: (e.g. 2g, 8g). + + spark.executor.pyspark.memory + Not set + +The amount of memory to be allocated to PySpark in each executor, in MiB --- End diff -- I've added "When PySpark is run in YARN, this memory is added to executor resource requests." --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21977: [SPARK-25004][CORE] Add spark.executor.pyspark.me...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21977#discussion_r213035238 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala --- @@ -91,6 +91,13 @@ private[spark] class Client( private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse( math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt + private val isPython = sparkConf.get(IS_PYTHON_APP) --- End diff -- @holdenk, can you point me to that repo? I'd love to have a look at how you do mixed pipelines. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21977: [SPARK-25004][CORE] Add spark.executor.pyspark.me...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21977#discussion_r212782657 --- Diff: resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala --- @@ -161,6 +162,11 @@ abstract class BaseYarnClusterSuite } extraJars.foreach(launcher.addJar) +if (outFile.isDefined) { --- End diff -- Like I said, I think `foreach` is a bad practice with options, so I'd rather not change to use it. I'd be happy to change this to a pattern match if you think it is really desirable to get rid of the `.get`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21977: [SPARK-25004][CORE] Add spark.executor.pyspark.me...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/21977#discussion_r212757824 --- Diff: docs/configuration.md --- @@ -179,6 +179,15 @@ of the most common options to set are: (e.g. 2g, 8g). + + spark.executor.pyspark.memory + Not set + +The amount of memory to be allocated to PySpark in each executor, in MiB --- End diff -- We should probably mention that this is added to the executor memory request in Yarn mode. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21977: [SPARK-25004][CORE] Add spark.executor.pyspark.me...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/21977#discussion_r212760057 --- Diff: resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala --- @@ -161,6 +162,11 @@ abstract class BaseYarnClusterSuite } extraJars.foreach(launcher.addJar) +if (outFile.isDefined) { --- End diff -- If you do a foreach then the `.get` goes away and the code could be a little cleaner, but it's pretty minor. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21977: [SPARK-25004][CORE] Add spark.executor.pyspark.me...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/21977#discussion_r212757958 --- Diff: docs/configuration.md --- @@ -179,6 +179,15 @@ of the most common options to set are: (e.g. 2g, 8g). + + spark.executor.pyspark.memory + Not set + +The amount of memory to be allocated to PySpark in each executor, in MiB +unless otherwise specified. If set, PySpark memory for an executor will be +limited to this amount. If not set, Spark will not limit Python's memory use. --- End diff -- Maybe mention that in this case (unset) it's up to the user to keep Python + system processes in the overhead %. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21977: [SPARK-25004][CORE] Add spark.executor.pyspark.me...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/21977#discussion_r212759411 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala --- @@ -91,6 +91,13 @@ private[spark] class Client( private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse( math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt + private val isPython = sparkConf.get(IS_PYTHON_APP) --- End diff -- Interesting, I'll add this to my example mixed pipeline repo so folks can see this hack. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21977: [SPARK-25004][CORE] Add spark.executor.pyspark.me...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21977#discussion_r212714476 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -114,6 +114,10 @@ package object config { .checkValue(_ >= 0, "The off-heap memory size must not be negative") .createWithDefault(0) + private[spark] val PYSPARK_EXECUTOR_MEMORY = ConfigBuilder("spark.executor.pyspark.memory") --- End diff -- Yes, it should. I'll fix it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org