srowen commented on a change in pull request #32399:
URL: https://github.com/apache/spark/pull/32399#discussion_r631847666
##########
File path: mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala
##########
@@ -183,11 +194,27 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0")
override val uid: String)
}
// Wait for metrics to be calculated
- val foldMetrics = foldMetricFutures.map(ThreadUtils.awaitResult(_,
Duration.Inf))
-
- // Unpersist training & validation set once all metrics have been
produced
- trainingDataset.unpersist()
- validationDataset.unpersist()
+ val foldMetrics = try {
+ foldMetricFutures.map(ThreadUtils.awaitResult(_, Duration.Inf))
+ }
+ catch {
+ case e: Throwable =>
+ subTaskFailed = true
+ try {
+ Thread.sleep(1000)
+ sparkContext.cancelJobGroup(cvJobGroup)
+ } catch {
+ case _: Throwable => ()
+ }
+ throw e
+ }
Review comment:
Nit: finally on this line
##########
File path: python/pyspark/util.py
##########
@@ -263,6 +264,69 @@ def _parse_memory(s):
return int(float(s[:-1]) * units[s[-1].lower()])
+def is_pinned_thread_mode():
+ """
+ Return ``True`` when spark run under pinned thread mode.
+ """
+ from pyspark import SparkContext
+ return isinstance(SparkContext._gateway, ClientServer)
+
+
+def inheritable_thread_target(f):
+ """
+ Return thread target wrapper which is recommended to be used in PySpark
when the
+ pinned thread mode is enabled. The wrapper function, before calling
original
+ thread target, it inherits the inheritable properties specific
+ to JVM thread such as ``InheritableThreadLocal``.
+
+ Also, note that pinned thread mode does not close the connection from
Python
+ to JVM when the thread is finished in the Python side. With this wrapper,
Python
+ garbage-collects the Python thread instance and also closes the connection
+ which finishes JVM thread correctly.
+
+ When the pinned thread mode is off, it return the original ``f``.
+ :param f: the original thread target.
+
+ .. versionadded:: 3.1.0
Review comment:
I'd also not backport unless it's essential - how bad is it?
##########
File path: mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala
##########
@@ -168,9 +169,19 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0")
override val uid: String)
val validationDataset = sparkSession.createDataFrame(validation,
schemaWithoutFold).cache()
instr.logDebug(s"Train split $splitIndex with multiple sets of
parameters.")
+ val sparkContext = sparkSession.sparkContext
+ val oldJobGroup =
sparkContext.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID)
+ val cvJobGroup = s"${this.uid}_job_group"
+ sparkContext.setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID,
cvJobGroup)
+ @volatile var subTaskFailed = false
// Fit models in a Future for training in parallel
val foldMetricFutures = epm.zipWithIndex.map { case (paramMap,
paramIndex) =>
Future[Double] {
+ if (subTaskFailed) {
+ throw new RuntimeException(
Review comment:
The exception doesn't matter here? so RuntimeException is OK? otherwise
that's probably overly broad. At least, IllegalStateException or something. But
if it's just going to kill the Future, OK
##########
File path: .idea/vcs.xml
##########
@@ -1,36 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
Review comment:
Is it intended to remove this file? looks unrelated.
##########
File path:
mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala
##########
@@ -161,11 +174,28 @@ class TrainValidationSplit @Since("1.5.0")
(@Since("1.5.0") override val uid: St
}
// Wait for all metrics to be calculated
- val metrics = metricFutures.map(ThreadUtils.awaitResult(_, Duration.Inf))
-
- // Unpersist training & validation set once all metrics have been produced
- trainingDataset.unpersist()
- validationDataset.unpersist()
+ val metrics = try {
+ metricFutures.map(ThreadUtils.awaitResult(_, Duration.Inf))
+ }
+ catch {
+ case e: Throwable =>
+ subTaskFailed = true
+ try {
+ Thread.sleep(1000)
+ val sparkContext = sparkSession.sparkContext
+ sparkContext.cancelJobGroup(tvsJobGroup)
+ } catch {
+ case _: Throwable => ()
Review comment:
Nit: is () necessary versus just nothing here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]