ueshin commented on code in PR #41948:
URL: https://github.com/apache/spark/pull/41948#discussion_r1267104749
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala:
##########
@@ -91,3 +138,123 @@ case class UserDefinedPythonTableFunction(
Dataset.ofRows(session, udtf)
}
}
+
+object UserDefinedPythonTableFunction {
+
+ private[this] val workerModule = "pyspark.sql.worker.analyze_udtf"
+
+ /**
+ * Runs the Python UDTF's `analyze` static method.
+ *
+ * When the Python UDTF is defined without a static return type,
+ * the analyze will call this while resolving table-valued functions.
+ *
+ * This expects the Python UDTF to have `analyze` static method that take
arguments:
+ *
+ * - The number and order of arguments are the same as the UDTF inputs
+ * - Each argument is an `AnalyzeArgument`, containing:
+ * - data_type: DataType
+ * - value: Any: if the argument is foldable; otherwise None
+ * - is_table: bool: True if the argument is TABLE
+ *
+ * and that return an `AnalyzeResult`.
+ *
+ * It serializes/deserializes the data types via JSON,
+ * and the values for the case the argument is foldable are pickled.
+ *
+ * `AnalysisException` with the error class
"TABLE_VALUED_FUNCTION_FAILED_TO_ANALYZE_IN_PYTHON"
+ * will be thrown when an exception is raised in Python.
+ */
+ def analyzeInPython(
+ tableArgs: Seq[Boolean])(func: PythonFunction, exprs: Seq[Expression]):
StructType = {
+ val env = SparkEnv.get
+ val bufferSize: Int = env.conf.get(BUFFER_SIZE)
+ val authSocketTimeout = env.conf.get(PYTHON_AUTH_SOCKET_TIMEOUT)
+ val reuseWorker = env.conf.get(PYTHON_WORKER_REUSE)
+ val simplifiedTraceback: Boolean = SQLConf.get.pysparkSimplifiedTraceback
+
+ val envVars = new HashMap[String, String](func.envVars)
+ val pythonExec = func.pythonExec
Review Comment:
That's a good point. Let's discuss it in a separate PR.
`PYSPARK_DRIVER_PYTHON` is also used to run PySpark on Jupyter, so we might
want to add another variable like `driverPythonExec` getting from the
`sys.executable` or something.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]