smurching commented on a change in pull request #28986:
URL: https://github.com/apache/spark/pull/28986#discussion_r461168188



##########
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##########
@@ -2554,6 +2557,19 @@ object SparkContext extends Logging {
     }
   }
 
+  /**
+   * Called to ensure that SparkContext is created or accessed only on the 
Driver.
+   *
+   * Throws an exception if a SparkContext is about to be created in executors.
+   */
+  private def assertOnDriver(): Unit = {
+    if (TaskContext.get != null) {

Review comment:
       BTW, had been discussing this offline with @ueshin and @HyukjinKwon - 
one use case that previously (and admittedly unfortunately) relied on the 
ability to create `SparkContext`s in the executors is MLflow's 
`mlflow.pyfunc.spark_udf` API ([see docs & usage 
example](https://www.mlflow.org/docs/latest/python_api/mlflow.pyfunc.html#mlflow.pyfunc.spark_udf)),
 which provides a unified interface for scoring models trained in arbitrary ML 
frameworks (tensorflow, scikit-learn, pyspark.ml) on Spark DataFrames as a 
pandas UDF. Most ML frameworks (e.g. sklearn) are single-node frameworks that 
can operate on pandas dataframes, so applying them via pandas UDF works well. 
For a pyspark.ml model to be applied via pandas UDF, we need to convert the 
input pandas series -> spark dataframe on the executors, which requires a 
SparkContext on the executors.
   
   I'll dig to see if there's a way to keep the MLflow use case working with 
this change (e.g. pyspark.ml models may ultimately perform inference via spark 
UDF, so maybe we can somehow extract the underlying UDF from the model and 
return that from `mlflow.pyfunc.spark_udf`?), but otherwise agree that we may 
want to revert this change.
   
   
   
   
   
   

##########
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##########
@@ -2554,6 +2557,19 @@ object SparkContext extends Logging {
     }
   }
 
+  /**
+   * Called to ensure that SparkContext is created or accessed only on the 
Driver.
+   *
+   * Throws an exception if a SparkContext is about to be created in executors.
+   */
+  private def assertOnDriver(): Unit = {
+    if (TaskContext.get != null) {

Review comment:
       I also sympathize with the notion that it's an antipattern to create a 
`SparkContext` on the executors, but not sure there's always a workaround for 
doing so

##########
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##########
@@ -2554,6 +2557,19 @@ object SparkContext extends Logging {
     }
   }
 
+  /**
+   * Called to ensure that SparkContext is created or accessed only on the 
Driver.
+   *
+   * Throws an exception if a SparkContext is about to be created in executors.
+   */
+  private def assertOnDriver(): Unit = {
+    if (TaskContext.get != null) {

Review comment:
       @zsxwing got it makes sense! Yeah, MLflow's spark UDF works pretty 
reliably AFAIK, at least for this use case :P. When scoring a pyspark.ml model 
in a pandas UDF, we construct a SparkContext via the standard 
SparkSession.builder.getOrCreate API ([see MLflow 
code](https://github.com/mlflow/mlflow/blob/86ea84f9e6d58b702f362ff512f61cff8ada021f/mlflow/spark.py#L520-L521))




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to