Repository: spark
Updated Branches:
  refs/heads/master 0bf1a74a7 -> fba03133d


[SPARK-22707][ML] Optimize CrossValidator memory occupation by models in fitting

## What changes were proposed in this pull request?

Via some test I found CrossValidator still exists memory issue, it will still 
occupy `O(n*sizeof(model))` memory for holding models when fitting, if well 
optimized, it should be `O(parallelism*sizeof(model))`

This is because modelFutures will hold the reference to model object after 
future is complete (we can use `future.value.get.get` to fetch it), and the 
`Future.sequence` and the `modelFutures` array holds references to each model 
future. So all model object are keep referenced. So it will still occupy 
`O(n*sizeof(model))` memory.

I fix this by merging the `modelFuture` and `foldMetricFuture` together, and 
use `atomicInteger` to statistic complete fitting tasks and when all done, 
trigger `trainingDataset.unpersist`.

I ever commented this issue on the old PR [SPARK-19357]
https://github.com/apache/spark/pull/16774#pullrequestreview-53674264
unfortunately, at that time I do not realize that the issue still exists, but 
now I confirm it and create this PR to fix it.

## Discussion
I give 3 approaches which we can compare, after discussion I realized none of 
them is ideal, we have to make a trade-off.

**After discussion with jkbradley , choose approach 3**

### Approach 1
~~The approach proposed by MrBago at~~ 
https://github.com/apache/spark/pull/19904#discussion_r156751569
~~This approach resolve the model objects referenced issue, allow the model 
objects to be GCed in time. **BUT, in some cases, it still do not resolve the 
O(N) model memory occupation issue**. Let me use an extreme case to describe 
it:~~
~~suppose we set `parallelism = 1`, and there're 100 paramMaps. So we have 100 
fitting & evaluation tasks. In this approach, because of `parallelism = 1`, the 
code have to wait 100 fitting tasks complete, **(at this time the memory 
occupation by models already reach 100 * sizeof(model) )** and then it will 
unpersist training dataset and then do 100 evaluation tasks.~~

### Approach 2
~~This approach is my PR old version code~~ 
https://github.com/apache/spark/pull/19904/commits/2cc7c28f385009570536690d686f2843485942b2
~~This approach can make sure at any case, the peak memory occupation by models 
to be `O(numParallelism * sizeof(model))`, but, it exists an issue that, in 
some extreme case, the "unpersist training dataset" will be delayed until most 
of the evaluation tasks complete. Suppose the case
 `parallelism = 1`, and there're 100 fitting & evaluation tasks, each 
fitting&evaluation task have to be executed one by one, so only after the first 
99 fitting&evaluation tasks and the 100th fitting task complete, the "unpersist 
training dataset" will be triggered.~~

### Approach 3
After I compared approach 1 and approach 2, I realized that, in the case which 
parallelism is low but there're many fitting & evaluation tasks, we cannot 
achieve both of the following two goals:
- Make the peak memory occupation by models(driver-side) to be O(parallelism * 
sizeof(model))
- unpersist training dataset before most of the evaluation tasks started.

So I vote for a simpler approach, move the unpersist training dataset to the 
end (Does this really matters ?)
Because the goal 1 is more important, we must make sure the peak memory 
occupation by models (driver-side) to be O(parallelism * sizeof(model)), 
otherwise it will bring high risk of OOM.
Like following code:
```
      val foldMetricFutures = epm.zipWithIndex.map { case (paramMap, 
paramIndex) =>
        Future[Double] {
          val model = est.fit(trainingDataset, paramMap).asInstanceOf[Model[_]]
          //...other minor codes
          val metric = eval.evaluate(model.transform(validationDataset, 
paramMap))
          logDebug(s"Got metric metricformodeltrainedwithparamMap.")
          metric
        } (executionContext)
      }
      val foldMetrics = foldMetricFutures.map(ThreadUtils.awaitResult(_, 
Duration.Inf))
      trainingDataset.unpersist() // <------- unpersist at the end
      validationDataset.unpersist()
```

## How was this patch tested?

N/A

Author: WeichenXu <weichen...@databricks.com>

Closes #19904 from WeichenXu123/fix_cross_validator_memory_issue.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fba03133
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fba03133
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fba03133

Branch: refs/heads/master
Commit: fba03133d1369ce8cfebd6ae68b987a95f1b7ea1
Parents: 0bf1a74
Author: WeichenXu <weichen...@databricks.com>
Authored: Sun Dec 24 22:57:53 2017 -0800
Committer: Joseph K. Bradley <jos...@databricks.com>
Committed: Sun Dec 24 22:57:53 2017 -0800

----------------------------------------------------------------------
 .../apache/spark/ml/tuning/CrossValidator.scala    | 17 +++--------------
 1 file changed, 3 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fba03133/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala 
b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala
index 1682ca9..0130b3e 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala
@@ -147,24 +147,12 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") 
override val uid: String)
       logDebug(s"Train split $splitIndex with multiple sets of parameters.")
 
       // Fit models in a Future for training in parallel
-      val modelFutures = epm.zipWithIndex.map { case (paramMap, paramIndex) =>
-        Future[Model[_]] {
+      val foldMetricFutures = epm.zipWithIndex.map { case (paramMap, 
paramIndex) =>
+        Future[Double] {
           val model = est.fit(trainingDataset, paramMap).asInstanceOf[Model[_]]
-
           if (collectSubModelsParam) {
             subModels.get(splitIndex)(paramIndex) = model
           }
-          model
-        } (executionContext)
-      }
-
-      // Unpersist training data only when all models have trained
-      Future.sequence[Model[_], Iterable](modelFutures)(implicitly, 
executionContext)
-        .onComplete { _ => trainingDataset.unpersist() } (executionContext)
-
-      // Evaluate models in a Future that will calulate a metric and allow 
model to be cleaned up
-      val foldMetricFutures = modelFutures.zip(epm).map { case (modelFuture, 
paramMap) =>
-        modelFuture.map { model =>
           // TODO: duplicate evaluator to take extra params from input
           val metric = eval.evaluate(model.transform(validationDataset, 
paramMap))
           logDebug(s"Got metric $metric for model trained with $paramMap.")
@@ -174,6 +162,7 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") 
override val uid: String)
 
       // Wait for metrics to be calculated before unpersisting validation 
dataset
       val foldMetrics = foldMetricFutures.map(ThreadUtils.awaitResult(_, 
Duration.Inf))
+      trainingDataset.unpersist()
       validationDataset.unpersist()
       foldMetrics
     }.transpose.map(_.sum / $(numFolds)) // Calculate average metric over all 
splits


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to