xinrong-meng opened a new pull request, #49055:
URL: https://github.com/apache/spark/pull/49055

   ### What changes were proposed in this pull request?
   Support DataFrame conversion to table arguments in Spark Classic, and enable 
UDTFs to accept table arguments in both PySpark and Scala.
   
   ### Why are the changes needed?
   Table-Valued Functions (TVFs) and User-Defined Table Functions (UDTFs) are 
widely used in Spark workflows. These functions often require a table argument, 
which Spark internally represents as a Catalyst expression. While Spark SQL 
supports constructs like TABLE(<query>) for this purpose, **there is no direct 
API in PySpark or Scala to convert a DataFrame into a table argument**. So we 
propose to support DataFrame conversion to table arguments (in Spark Classic 
first), and enable UDTFs to accept table arguments in both PySpark and Scala..
   
   ### Does this PR introduce _any_ user-facing change?
   Yes DataFrame conversion to table argument is supported in Spark Classic, 
and UDTFs accept table arguments in both PySpark and Scala.
   
   ```py
   >>> from pyspark.sql.functions import udtf
   >>> from pyspark.sql import Row
   >>> 
   >>> @udtf(returnType="a: int")
   ... class TestUDTF:
   ...     def eval(self, row: Row):
   ...         if row[0] > 5:
   ...             yield row[0],
   ...     def terminate(self):
   ...         pass
   ... 
   >>> df = spark.range(8)
   >>> 
   >>> TestUDTF(df.forTableFunction()).show()
   +---+                                                                        
   
   |  a|
   +---+
   |  6|
   |  7|
   +---+
   
   >>> TestUDTF(df.forTableFunction().partitionBy(df.id)).show()
   +---+
   |  a|
   +---+
   |  6|
   |  7|
   +---+
   
   >>> TestUDTF(df.forTableFunction().partitionBy(df.id).orderBy(df.id)).show()
   +---+
   |  a|
   +---+
   |  6|
   |  7|
   +---+
   
   >>> TestUDTF(df.forTableFunction().withSinglePartition()).show()
   +---+
   |  a|
   +---+
   |  6|
   |  7|
   +---+
   
   >>> 
TestUDTF(df.forTableFunction().partitionBy(df.id).withSinglePartition()).show()
   Traceback (most recent call last):
   ...
   pyspark.errors.exceptions.captured.IllegalArgumentException: Cannot call 
withSinglePartition() after partitionBy() has been called.
   ```
   
   ### How was this patch tested?
   [TO ADD]
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to