Github user rdblue commented on a diff in the pull request:
https://github.com/apache/spark/pull/21977#discussion_r208310790
--- Diff:
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -60,14 +61,26 @@ private[spark] object PythonEvalType {
*/
private[spark] abstract class BasePythonRunner[IN, OUT](
funcs: Seq[ChainedPythonFunctions],
- bufferSize: Int,
- reuseWorker: Boolean,
evalType: Int,
- argOffsets: Array[Array[Int]])
+ argOffsets: Array[Array[Int]],
+ conf: SparkConf)
extends Logging {
require(funcs.length == argOffsets.length, "argOffsets should have the
same length as funcs")
+ private val bufferSize = conf.getInt("spark.buffer.size", 65536)
+ private val reuseWorker = conf.getBoolean("spark.python.worker.reuse",
true)
+ private val memoryMb = {
+ val allocation = conf.get(PYSPARK_EXECUTOR_MEMORY)
+ if (reuseWorker) {
--- End diff --
That `useDaemon` flag controls whether worker.py is called directly from
Spark or from a daemon process that forks new workers. The daemon process is
used because [forking from a huge JVM is more
expensive](https://github.com/apache/spark/blob/628c7b517969c4a7ccb26ea67ab3dd61266073ca/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L102-L104).
This isn't related to REUSE_WORKER, which is [used by the forked worker
process](https://github.com/apache/spark/blob/628c7b517969c4a7ccb26ea67ab3dd61266073ca/python/pyspark/daemon.py#L112).
If reuse is set, then the process [keeps the worker
alive](https://github.com/apache/spark/blob/628c7b517969c4a7ccb26ea67ab3dd61266073ca/python/pyspark/daemon.py#L173)
and will call its main method again.
But, it looks like my understanding of `spark.python.worker.reuse` is still
incorrect given that configuration's description: "Reuse Python worker or not.
If yes, it will use a fixed number of Python workers, does not need to fork() a
Python process for every task." Sounds like reuse means that workers stay
around for multiple tasks, not that concurrent tasks run in the same worker.
I'll do some more digging to see what determines the number of workers. My
guess is the number of cores, in which case this should always divide by the
number of cores.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]