Github user WeichenXu123 commented on the issue:

    https://github.com/apache/spark/pull/19904
  
    I discussed with @MrBago offline, I make a summary for what I thought now:
    I give 3 approaches which we can compare, after discussion I realized none 
of them is ideal, we have to make a trade-off.
    
    ## Approach 1
    The approach proposed by @MrBago at 
https://github.com/apache/spark/pull/19904#discussion_r156751569
    This approach resolve the model objects referenced issue, allow the model 
objects to be GCed in time. **BUT, in some cases, it still do not resolve the 
O(N) model memory occupation issue**. Let me use an extreme case to describe it:
    suppose we set `parallelism = 1`, and there're 100 paramMaps. So we have 
100 fitting & evaluation tasks. In this approach, because of `parallelism = 1`, 
the code have to wait 100 fitting tasks complete, **(at this time the memory 
occupation by models already reach 100 * sizeof(model) )** and then it will 
unpersist training dataset and then do 100 evaluation tasks.
    
    ## Approach 2
    This approach is my current PR code.
    This approach can make sure at any case, the peak memory occupation by 
models to be `O(numParallelism * sizeof(model))`, but, it exists an issue that, 
in some extreme case, the "unpersist training dataset" will be delayed until 
most of the evaluation tasks complete. Suppose the case 
     `parallelism = 1`, and there're 100 fitting & evaluation tasks, each 
fitting&evaluation task have to be executed one by one, so only after the first 
99 fitting&evaluation tasks and the 100th fitting task complete, the "unpersist 
training dataset" will be triggered.
    
    ## Approach 3
    After I compared approach 1 and approach 2, I realized that, in the case 
which parallelism is low but there're many fitting & evaluation tasks, we 
cannot achieve both of the following two goals:
    - Make the peak memory occupation by models to be O(parallelism * 
sizeof(model))
    - unpersist training dataset before most of the evaluation tasks started.
    
    So I vote for a simpler approach, move the unpersist training dataset to 
the end (Does this really matters ?)
    Because the goal 1 is more important, we must make sure the peak memory 
occupation by models to be O(parallelism * sizeof(model)), otherwise it will 
bring high risk of OOM.
    Like following code:
    ```
          val foldMetricFutures = epm.zipWithIndex.map { case (paramMap, 
paramIndex) =>
            Future[Double] {
              val model = est.fit(trainingDataset, 
paramMap).asInstanceOf[Model[_]]
              //...other minor codes
              val metric = eval.evaluate(model.transform(validationDataset, 
paramMap))
              logDebug(s"Got metric metricformodeltrainedwithparamMap.")
              metric
            } (executionContext)
          }
          val foldMetrics = foldMetricFutures.map(ThreadUtils.awaitResult(_, 
Duration.Inf))
          trainingDataset.unpersist() // <------- unpersist at the end
          validationDataset.unpersist()
    ```
    
    Gentle ping @jkbradley @MrBago @sethah @BryanCutler @holdenk 



---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to