Github user sethah commented on a diff in the pull request:
https://github.com/apache/spark/pull/19904#discussion_r155710913
--- Diff:
mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala ---
@@ -146,25 +147,18 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0")
override val uid: String)
val validationDataset = sparkSession.createDataFrame(validation,
schema).cache()
logDebug(s"Train split $splitIndex with multiple sets of
parameters.")
+ val completeFitCount = new AtomicInteger(0)
--- End diff --
My understanding of Scala futures may be off here, but this seems to change
the behavior to me. Now, the unpersist operation will happen in one of the
training threads, instead of asynchronously in its own thread. I'm not sure how
much of an effect that will have.
Why can't you just put all the logic in one map statement like below:
````scala
// Fit models in a Future for training in parallel
val modelFutures = epm.zipWithIndex.map { case (paramMap, paramIndex)
=>
Future[Model[_]] {
val model = est.fit(trainingDataset,
paramMap).asInstanceOf[Model[_]]
if (collectSubModelsParam) {
subModels.get(splitIndex)(paramIndex) = model
}
// TODO: duplicate evaluator to take extra params from input
val metric = eval.evaluate(model.transform(validationDataset,
paramMap))
logDebug(s"Got metric $metric for model trained with $paramMap.")
metric
} (executionContext)
}
// Unpersist training data only when all models have trained
Future.sequence[Model[_], Iterable](modelFutures)(implicitly,
executionContext)
.onComplete { _ => trainingDataset.unpersist() } (executionContext)
````
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]