ueshin commented on code in PR #43204:
URL: https://github.com/apache/spark/pull/43204#discussion_r1346427159
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala:
##########
@@ -290,6 +295,20 @@ object UserDefinedPythonTableFunction {
val msg = new String(obj, StandardCharsets.UTF_8)
throw
QueryCompilationErrors.tableValuedFunctionFailedToAnalyseInPythonError(msg)
}
+ // Receive the "prepare_buffer" string, if any.
+ val prepareBuffer: String = dataIn.readInt() match {
+ case length: Int if length >= 0 =>
+ val obj = new Array[Byte](length)
+ dataIn.readFully(obj)
+ new String(obj, StandardCharsets.UTF_8)
+
+ case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
+ val exLength = dataIn.readInt()
+ val obj = new Array[Byte](exLength)
+ dataIn.readFully(obj)
+ val msg = new String(obj, StandardCharsets.UTF_8)
+ throw
QueryCompilationErrors.tableValuedFunctionFailedToAnalyseInPythonError(msg)
Review Comment:
Why do we need this part?
##########
python/pyspark/sql/udtf.py:
##########
@@ -107,12 +107,20 @@ class AnalyzeResult:
If non-empty, this is a sequence of columns that the UDTF is
specifying for Catalyst to
sort the input TABLE argument by. Note that the 'partition_by' list
must also be non-empty
in this case.
+ prepare_buffer: str
+ If non-empty, this string represents state computed once within the
'analyze' method to be
+ propagated to each instance of the UDTF class at the time of its
creation, using its
+ 'prepare' method. The format this buffer is opaque and known only to
the data source. Common
+ use cases include serializing protocol buffers or JSON configurations
into this buffer so
+ that potentially expensive initialization work done at 'analyze' time
does not need to be
+ recomputed later.
"""
schema: StructType
with_single_partition: bool = False
partition_by: Sequence[PartitioningColumn] = field(default_factory=tuple)
order_by: Sequence[OrderingColumn] = field(default_factory=tuple)
+ prepare_buffer: str = ""
Review Comment:
I guess we should distinguish an empty string and `None`.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala:
##########
@@ -167,22 +169,26 @@ abstract class UnevaluableGenerator extends Generator {
* @param udfDeterministic true if this function is deterministic wherein it
returns the same result
* rows for every call with the same input arguments
* @param resultId unique expression ID for this function invocation
- * @param pythonUDTFPartitionColumnIndexes holds the indexes of the TABLE
argument to the Python
- * UDTF call, if applicable
+ * @param pythonUDTFPartitionColumnIndexes holds the zero-based indexes of the
projected results of
+ * all PARTITION BY expressions within
the TABLE argument of
+ * the Python UDTF call, if applicable
* @param analyzeResult holds the result of the polymorphic Python UDTF
'analze' method, if the UDTF
* defined one
*/
case class PythonUDTF(
name: String,
func: PythonFunction,
- elementSchema: StructType,
+ analyzeResult: PythonUDTFAnalyzeResult,
Review Comment:
I'm not sure if we should have the whole `PythonUDTFAnalyzeResult` here..
##########
python/pyspark/worker.py:
##########
@@ -786,6 +787,24 @@ def _remove_partition_by_exprs(self, arg: Any) -> Any:
else:
return arg
+ # Wrap the UDTF handler to call the "prepare" method if there was a
non-empty "prepare_buffer"
+ # string returned from the "analyze" method earlier.
+ if len(prepare_buffer) > 0:
+ prev_handler = handler
+
+ def handler_with_prepare():
+ new_udtf = prev_handler()
+ if new_udtf.prepare is None:
+ raise PySparkRuntimeError(
+ "The 'analyze' method returned a non-empty
'prepare_buffer' string, but the "
Review Comment:
We should use some error class here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]