smurching commented on a change in pull request #28986:
URL: https://github.com/apache/spark/pull/28986#discussion_r461168188
##########
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##########
@@ -2554,6 +2557,19 @@ object SparkContext extends Logging {
}
}
+ /**
+ * Called to ensure that SparkContext is created or accessed only on the
Driver.
+ *
+ * Throws an exception if a SparkContext is about to be created in executors.
+ */
+ private def assertOnDriver(): Unit = {
+ if (TaskContext.get != null) {
Review comment:
BTW, had been discussing this offline with @ueshin and @HyukjinKwon -
one use case that previously (and admittedly unfortunately) relied on the
ability to create `SparkContext`s in the executors is MLflow's
`mlflow.pyfunc.spark_udf` API ([see docs & usage
example](https://www.mlflow.org/docs/latest/python_api/mlflow.pyfunc.html#mlflow.pyfunc.spark_udf)),
which provides a unified interface for scoring models trained in arbitrary ML
frameworks (tensorflow, scikit-learn, pyspark.ml) on Spark DataFrames as a
pandas UDF. Most ML frameworks (e.g. sklearn) are single-node frameworks that
can operate on pandas dataframes, so applying them via pandas UDF works well.
For a pyspark.ml model to be applied via pandas UDF, we need to convert the
input pandas series -> spark dataframe on the executors, which requires a
SparkContext on the executors.
I'll dig to see if there's a way to keep the MLflow use case working with
this change (e.g. pyspark.ml models may ultimately perform inference via spark
UDF, so maybe we can somehow extract the underlying UDF from the model and
return that from `mlflow.pyfunc.spark_udf`?), but otherwise agree that we may
want to revert this change.
##########
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##########
@@ -2554,6 +2557,19 @@ object SparkContext extends Logging {
}
}
+ /**
+ * Called to ensure that SparkContext is created or accessed only on the
Driver.
+ *
+ * Throws an exception if a SparkContext is about to be created in executors.
+ */
+ private def assertOnDriver(): Unit = {
+ if (TaskContext.get != null) {
Review comment:
I also sympathize with the notion that it's an antipattern to create a
`SparkContext` on the executors, but not sure there's always a workaround for
doing so
##########
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##########
@@ -2554,6 +2557,19 @@ object SparkContext extends Logging {
}
}
+ /**
+ * Called to ensure that SparkContext is created or accessed only on the
Driver.
+ *
+ * Throws an exception if a SparkContext is about to be created in executors.
+ */
+ private def assertOnDriver(): Unit = {
+ if (TaskContext.get != null) {
Review comment:
@zsxwing got it makes sense! Yeah, MLflow's spark UDF works pretty
reliably AFAIK, at least for this use case :P. When scoring a pyspark.ml model
in a pandas UDF, we construct a SparkContext via the standard
SparkSession.builder.getOrCreate API ([see MLflow
code](https://github.com/mlflow/mlflow/blob/86ea84f9e6d58b702f362ff512f61cff8ada021f/mlflow/spark.py#L520-L521))
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]