Repository: spark Updated Branches: refs/heads/master 0bf1a74a7 -> fba03133d
[SPARK-22707][ML] Optimize CrossValidator memory occupation by models in fitting ## What changes were proposed in this pull request? Via some test I found CrossValidator still exists memory issue, it will still occupy `O(n*sizeof(model))` memory for holding models when fitting, if well optimized, it should be `O(parallelism*sizeof(model))` This is because modelFutures will hold the reference to model object after future is complete (we can use `future.value.get.get` to fetch it), and the `Future.sequence` and the `modelFutures` array holds references to each model future. So all model object are keep referenced. So it will still occupy `O(n*sizeof(model))` memory. I fix this by merging the `modelFuture` and `foldMetricFuture` together, and use `atomicInteger` to statistic complete fitting tasks and when all done, trigger `trainingDataset.unpersist`. I ever commented this issue on the old PR [SPARK-19357] https://github.com/apache/spark/pull/16774#pullrequestreview-53674264 unfortunately, at that time I do not realize that the issue still exists, but now I confirm it and create this PR to fix it. ## Discussion I give 3 approaches which we can compare, after discussion I realized none of them is ideal, we have to make a trade-off. **After discussion with jkbradley , choose approach 3** ### Approach 1 ~~The approach proposed by MrBago at~~ https://github.com/apache/spark/pull/19904#discussion_r156751569 ~~This approach resolve the model objects referenced issue, allow the model objects to be GCed in time. **BUT, in some cases, it still do not resolve the O(N) model memory occupation issue**. Let me use an extreme case to describe it:~~ ~~suppose we set `parallelism = 1`, and there're 100 paramMaps. So we have 100 fitting & evaluation tasks. In this approach, because of `parallelism = 1`, the code have to wait 100 fitting tasks complete, **(at this time the memory occupation by models already reach 100 * sizeof(model) )** and then it will unpersist training dataset and then do 100 evaluation tasks.~~ ### Approach 2 ~~This approach is my PR old version code~~ https://github.com/apache/spark/pull/19904/commits/2cc7c28f385009570536690d686f2843485942b2 ~~This approach can make sure at any case, the peak memory occupation by models to be `O(numParallelism * sizeof(model))`, but, it exists an issue that, in some extreme case, the "unpersist training dataset" will be delayed until most of the evaluation tasks complete. Suppose the case `parallelism = 1`, and there're 100 fitting & evaluation tasks, each fitting&evaluation task have to be executed one by one, so only after the first 99 fitting&evaluation tasks and the 100th fitting task complete, the "unpersist training dataset" will be triggered.~~ ### Approach 3 After I compared approach 1 and approach 2, I realized that, in the case which parallelism is low but there're many fitting & evaluation tasks, we cannot achieve both of the following two goals: - Make the peak memory occupation by models(driver-side) to be O(parallelism * sizeof(model)) - unpersist training dataset before most of the evaluation tasks started. So I vote for a simpler approach, move the unpersist training dataset to the end (Does this really matters ?) Because the goal 1 is more important, we must make sure the peak memory occupation by models (driver-side) to be O(parallelism * sizeof(model)), otherwise it will bring high risk of OOM. Like following code: ``` val foldMetricFutures = epm.zipWithIndex.map { case (paramMap, paramIndex) => Future[Double] { val model = est.fit(trainingDataset, paramMap).asInstanceOf[Model[_]] //...other minor codes val metric = eval.evaluate(model.transform(validationDataset, paramMap)) logDebug(s"Got metric metricformodeltrainedwithparamMap.") metric } (executionContext) } val foldMetrics = foldMetricFutures.map(ThreadUtils.awaitResult(_, Duration.Inf)) trainingDataset.unpersist() // <------- unpersist at the end validationDataset.unpersist() ``` ## How was this patch tested? N/A Author: WeichenXu <weichen...@databricks.com> Closes #19904 from WeichenXu123/fix_cross_validator_memory_issue. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fba03133 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fba03133 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fba03133 Branch: refs/heads/master Commit: fba03133d1369ce8cfebd6ae68b987a95f1b7ea1 Parents: 0bf1a74 Author: WeichenXu <weichen...@databricks.com> Authored: Sun Dec 24 22:57:53 2017 -0800 Committer: Joseph K. Bradley <jos...@databricks.com> Committed: Sun Dec 24 22:57:53 2017 -0800 ---------------------------------------------------------------------- .../apache/spark/ml/tuning/CrossValidator.scala | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/fba03133/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala index 1682ca9..0130b3e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala @@ -147,24 +147,12 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String) logDebug(s"Train split $splitIndex with multiple sets of parameters.") // Fit models in a Future for training in parallel - val modelFutures = epm.zipWithIndex.map { case (paramMap, paramIndex) => - Future[Model[_]] { + val foldMetricFutures = epm.zipWithIndex.map { case (paramMap, paramIndex) => + Future[Double] { val model = est.fit(trainingDataset, paramMap).asInstanceOf[Model[_]] - if (collectSubModelsParam) { subModels.get(splitIndex)(paramIndex) = model } - model - } (executionContext) - } - - // Unpersist training data only when all models have trained - Future.sequence[Model[_], Iterable](modelFutures)(implicitly, executionContext) - .onComplete { _ => trainingDataset.unpersist() } (executionContext) - - // Evaluate models in a Future that will calulate a metric and allow model to be cleaned up - val foldMetricFutures = modelFutures.zip(epm).map { case (modelFuture, paramMap) => - modelFuture.map { model => // TODO: duplicate evaluator to take extra params from input val metric = eval.evaluate(model.transform(validationDataset, paramMap)) logDebug(s"Got metric $metric for model trained with $paramMap.") @@ -174,6 +162,7 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String) // Wait for metrics to be calculated before unpersisting validation dataset val foldMetrics = foldMetricFutures.map(ThreadUtils.awaitResult(_, Duration.Inf)) + trainingDataset.unpersist() validationDataset.unpersist() foldMetrics }.transpose.map(_.sum / $(numFolds)) // Calculate average metric over all splits --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org