ueshin commented on code in PR #43204:
URL: https://github.com/apache/spark/pull/43204#discussion_r1346427159


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala:
##########
@@ -290,6 +295,20 @@ object UserDefinedPythonTableFunction {
           val msg = new String(obj, StandardCharsets.UTF_8)
           throw 
QueryCompilationErrors.tableValuedFunctionFailedToAnalyseInPythonError(msg)
       }
+      // Receive the "prepare_buffer" string, if any.
+      val prepareBuffer: String = dataIn.readInt() match {
+        case length: Int if length >= 0 =>
+          val obj = new Array[Byte](length)
+          dataIn.readFully(obj)
+          new String(obj, StandardCharsets.UTF_8)
+
+        case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
+          val exLength = dataIn.readInt()
+          val obj = new Array[Byte](exLength)
+          dataIn.readFully(obj)
+          val msg = new String(obj, StandardCharsets.UTF_8)
+          throw 
QueryCompilationErrors.tableValuedFunctionFailedToAnalyseInPythonError(msg)

Review Comment:
   Why do we need this part?



##########
python/pyspark/sql/udtf.py:
##########
@@ -107,12 +107,20 @@ class AnalyzeResult:
         If non-empty, this is a sequence of columns that the UDTF is 
specifying for Catalyst to
         sort the input TABLE argument by. Note that the 'partition_by' list 
must also be non-empty
         in this case.
+    prepare_buffer: str
+        If non-empty, this string represents state computed once within the 
'analyze' method to be
+        propagated to each instance of the UDTF class at the time of its 
creation, using its
+        'prepare' method. The format this buffer is opaque and known only to 
the data source. Common
+        use cases include serializing protocol buffers or JSON configurations 
into this buffer so
+        that potentially expensive initialization work done at 'analyze' time 
does not need to be
+        recomputed later.
     """
 
     schema: StructType
     with_single_partition: bool = False
     partition_by: Sequence[PartitioningColumn] = field(default_factory=tuple)
     order_by: Sequence[OrderingColumn] = field(default_factory=tuple)
+    prepare_buffer: str = ""

Review Comment:
   I guess we should distinguish an empty string and `None`.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala:
##########
@@ -167,22 +169,26 @@ abstract class UnevaluableGenerator extends Generator {
  * @param udfDeterministic true if this function is deterministic wherein it 
returns the same result
  *                         rows for every call with the same input arguments
  * @param resultId unique expression ID for this function invocation
- * @param pythonUDTFPartitionColumnIndexes holds the indexes of the TABLE 
argument to the Python
- *                                         UDTF call, if applicable
+ * @param pythonUDTFPartitionColumnIndexes holds the zero-based indexes of the 
projected results of
+ *                                         all PARTITION BY expressions within 
the TABLE argument of
+ *                                         the Python UDTF call, if applicable
  * @param analyzeResult holds the result of the polymorphic Python UDTF 
'analze' method, if the UDTF
  *                      defined one
  */
 case class PythonUDTF(
     name: String,
     func: PythonFunction,
-    elementSchema: StructType,
+    analyzeResult: PythonUDTFAnalyzeResult,

Review Comment:
   I'm not sure if we should have the whole `PythonUDTFAnalyzeResult` here..



##########
python/pyspark/worker.py:
##########
@@ -786,6 +787,24 @@ def _remove_partition_by_exprs(self, arg: Any) -> Any:
             else:
                 return arg
 
+    # Wrap the UDTF handler to call the "prepare" method if there was a 
non-empty "prepare_buffer"
+    # string returned from the "analyze" method earlier.
+    if len(prepare_buffer) > 0:
+        prev_handler = handler
+
+        def handler_with_prepare():
+            new_udtf = prev_handler()
+            if new_udtf.prepare is None:
+                raise PySparkRuntimeError(
+                    "The 'analyze' method returned a non-empty 
'prepare_buffer' string, but the "

Review Comment:
   We should use some error class here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to