Github user WeichenXu123 commented on the issue:
https://github.com/apache/spark/pull/19904
I discussed with @MrBago offline, I make a summary for what I thought now:
I give 3 approaches which we can compare, after discussion I realized none
of them is ideal, we have to make a trade-off.
## Approach 1
The approach proposed by @MrBago at
https://github.com/apache/spark/pull/19904#discussion_r156751569
This approach resolve the model objects referenced issue, allow the model
objects to be GCed in time. **BUT, in some cases, it still do not resolve the
O(N) model memory occupation issue**. Let me use an extreme case to describe it:
suppose we set `parallelism = 1`, and there're 100 paramMaps. So we have
100 fitting & evaluation tasks. In this approach, because of `parallelism = 1`,
the code have to wait 100 fitting tasks complete, **(at this time the memory
occupation by models already reach 100 * sizeof(model) )** and then it will
unpersist training dataset and then do 100 evaluation tasks.
## Approach 2
This approach is my current PR code.
This approach can make sure at any case, the peak memory occupation by
models to be `O(numParallelism * sizeof(model))`, but, it exists an issue that,
in some extreme case, the "unpersist training dataset" will be delayed until
most of the evaluation tasks complete. Suppose the case
`parallelism = 1`, and there're 100 fitting & evaluation tasks, each
fitting&evaluation task have to be executed one by one, so only after the first
99 fitting&evaluation tasks and the 100th fitting task complete, the "unpersist
training dataset" will be triggered.
## Approach 3
After I compared approach 1 and approach 2, I realized that, in the case
which parallelism is low but there're many fitting & evaluation tasks, we
cannot achieve both of the following two goals:
- Make the peak memory occupation by models to be O(parallelism *
sizeof(model))
- unpersist training dataset before most of the evaluation tasks started.
So I vote for a simpler approach, move the unpersist training dataset to
the end (Does this really matters ?)
Because the goal 1 is more important, we must make sure the peak memory
occupation by models to be O(parallelism * sizeof(model)), otherwise it will
bring high risk of OOM.
Like following code:
```
val foldMetricFutures = epm.zipWithIndex.map { case (paramMap,
paramIndex) =>
Future[Double] {
val model = est.fit(trainingDataset,
paramMap).asInstanceOf[Model[_]]
//...other minor codes
val metric = eval.evaluate(model.transform(validationDataset,
paramMap))
logDebug(s"Got metric metricformodeltrainedwithparamMap.")
metric
} (executionContext)
}
val foldMetrics = foldMetricFutures.map(ThreadUtils.awaitResult(_,
Duration.Inf))
trainingDataset.unpersist() // <------- unpersist at the end
validationDataset.unpersist()
```
Gentle ping @jkbradley @MrBago @sethah @BryanCutler @holdenk
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]