Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21977#discussion_r208310790
  
    --- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
    @@ -60,14 +61,26 @@ private[spark] object PythonEvalType {
      */
     private[spark] abstract class BasePythonRunner[IN, OUT](
         funcs: Seq[ChainedPythonFunctions],
    -    bufferSize: Int,
    -    reuseWorker: Boolean,
         evalType: Int,
    -    argOffsets: Array[Array[Int]])
    +    argOffsets: Array[Array[Int]],
    +    conf: SparkConf)
       extends Logging {
     
       require(funcs.length == argOffsets.length, "argOffsets should have the 
same length as funcs")
     
    +  private val bufferSize = conf.getInt("spark.buffer.size", 65536)
    +  private val reuseWorker = conf.getBoolean("spark.python.worker.reuse", 
true)
    +  private val memoryMb = {
    +    val allocation = conf.get(PYSPARK_EXECUTOR_MEMORY)
    +    if (reuseWorker) {
    --- End diff --
    
    That `useDaemon` flag controls whether worker.py is called directly from 
Spark or from a daemon process that forks new workers. The daemon process is 
used because [forking from a huge JVM is more 
expensive](https://github.com/apache/spark/blob/628c7b517969c4a7ccb26ea67ab3dd61266073ca/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L102-L104).
    
    This isn't related to REUSE_WORKER, which is [used by the forked worker 
process](https://github.com/apache/spark/blob/628c7b517969c4a7ccb26ea67ab3dd61266073ca/python/pyspark/daemon.py#L112).
 If reuse is set, then the process [keeps the worker 
alive](https://github.com/apache/spark/blob/628c7b517969c4a7ccb26ea67ab3dd61266073ca/python/pyspark/daemon.py#L173)
 and will call its main method again.
    
    But, it looks like my understanding of `spark.python.worker.reuse` is still 
incorrect given that configuration's description: "Reuse Python worker or not. 
If yes, it will use a fixed number of Python workers, does not need to fork() a 
Python process for every task." Sounds like reuse means that workers stay 
around for multiple tasks, not that concurrent tasks run in the same worker.
    
    I'll do some more digging to see what determines the number of workers. My 
guess is the number of cores, in which case this should always divide by the 
number of cores.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to