dtenedor commented on code in PR #43204:
URL: https://github.com/apache/spark/pull/43204#discussion_r1350991334


##########
python/pyspark/worker.py:
##########
@@ -693,6 +698,21 @@ def read_udtf(pickleSer, infile, eval_type):
             f"The return type of a UDTF must be a struct type, but got 
{type(return_type)}."
         )
 
+    # Update the handler that creates a new UDTF instance to first try calling 
the UDTF constructor
+    # with one argument containing the previous AnalyzeResult. If that fails, 
then try a constructor
+    # with no arguments. In this way each UDTF class instance can decide if it 
wants to inspect the
+    # AnalyzeResult.
+    if has_pickled_analyze_result:
+        prev_handler = handler
+
+        def construct_udtf():
+            try:
+                return prev_handler(pickled_analyze_result)

Review Comment:
   Sounds good, done.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala:
##########
@@ -284,6 +285,16 @@ object UserDefinedPythonTableFunction {
       val schema = DataType.fromJson(
         PythonWorkerUtils.readUTF(length, dataIn)).asInstanceOf[StructType]
 
+      // Receive the pickled AnalyzeResult buffer, if any.
+      val pickledAnalyzeResult: Option[Array[Byte]] = dataIn.readInt() match {
+        case 0 =>
+          None

Review Comment:
   This should not happen unless the UDTF does not include an `analyze` method, 
in which case there is no picked `AnalyzeResult` buffer.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala:
##########
@@ -241,12 +248,17 @@ case class UnresolvedPolymorphicPythonUDTF(
  * @param orderByExpressions if non-empty, this contains the list of ordering 
items that the
  *                           'analyze' method explicitly indicated that the 
UDTF call should consume
  *                           the input table rows by
+ * @param pickledAnalyzeResult this is the pickled 'AnalyzeResult' instance 
from the UDTF, which
+ *                              contains all metadata returned by the Python 
UDTF 'analyze' method
+ *                              including the result schema of the function 
call as well as optional
+ *                              other information
  */
 case class PythonUDTFAnalyzeResult(
     schema: StructType,
     withSinglePartition: Boolean,
     partitionByExpressions: Seq[Expression],
-    orderByExpressions: Seq[SortOrder]) {
+    orderByExpressions: Seq[SortOrder],
+    pickledAnalyzeResult: Option[Array[Byte]]) {

Review Comment:
   I find we need this since if the UDTF does not include an `analyze` method, 
this buffer is empty, and corresponding tests using these functions fail unless 
we check that this buffer is non-empty before using it.



##########
python/pyspark/sql/udtf.py:
##########
@@ -85,7 +85,7 @@ class OrderingColumn:
     overrideNullsFirst: Optional[bool] = None
 
 
-@dataclass(frozen=True)
+@dataclass

Review Comment:
   I found it was necessary to allow UDTFs to create subclasses of 
`AnalyzeResult`. This is the way that we plan for these functions to compute 
custom state in the `analyze` method just once per function call and then 
propagate this information to future class instances to be consumed by the 
`eval` and `terminate` methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to