dtenedor commented on code in PR #43204:
URL: https://github.com/apache/spark/pull/43204#discussion_r1350991334
##########
python/pyspark/worker.py:
##########
@@ -693,6 +698,21 @@ def read_udtf(pickleSer, infile, eval_type):
f"The return type of a UDTF must be a struct type, but got
{type(return_type)}."
)
+ # Update the handler that creates a new UDTF instance to first try calling
the UDTF constructor
+ # with one argument containing the previous AnalyzeResult. If that fails,
then try a constructor
+ # with no arguments. In this way each UDTF class instance can decide if it
wants to inspect the
+ # AnalyzeResult.
+ if has_pickled_analyze_result:
+ prev_handler = handler
+
+ def construct_udtf():
+ try:
+ return prev_handler(pickled_analyze_result)
Review Comment:
Sounds good, done.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala:
##########
@@ -284,6 +285,16 @@ object UserDefinedPythonTableFunction {
val schema = DataType.fromJson(
PythonWorkerUtils.readUTF(length, dataIn)).asInstanceOf[StructType]
+ // Receive the pickled AnalyzeResult buffer, if any.
+ val pickledAnalyzeResult: Option[Array[Byte]] = dataIn.readInt() match {
+ case 0 =>
+ None
Review Comment:
This should not happen unless the UDTF does not include an `analyze` method,
in which case there is no picked `AnalyzeResult` buffer.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala:
##########
@@ -241,12 +248,17 @@ case class UnresolvedPolymorphicPythonUDTF(
* @param orderByExpressions if non-empty, this contains the list of ordering
items that the
* 'analyze' method explicitly indicated that the
UDTF call should consume
* the input table rows by
+ * @param pickledAnalyzeResult this is the pickled 'AnalyzeResult' instance
from the UDTF, which
+ * contains all metadata returned by the Python
UDTF 'analyze' method
+ * including the result schema of the function
call as well as optional
+ * other information
*/
case class PythonUDTFAnalyzeResult(
schema: StructType,
withSinglePartition: Boolean,
partitionByExpressions: Seq[Expression],
- orderByExpressions: Seq[SortOrder]) {
+ orderByExpressions: Seq[SortOrder],
+ pickledAnalyzeResult: Option[Array[Byte]]) {
Review Comment:
I find we need this since if the UDTF does not include an `analyze` method,
this buffer is empty, and corresponding tests using these functions fail unless
we check that this buffer is non-empty before using it.
##########
python/pyspark/sql/udtf.py:
##########
@@ -85,7 +85,7 @@ class OrderingColumn:
overrideNullsFirst: Optional[bool] = None
-@dataclass(frozen=True)
+@dataclass
Review Comment:
I found it was necessary to allow UDTFs to create subclasses of
`AnalyzeResult`. This is the way that we plan for these functions to compute
custom state in the `analyze` method just once per function call and then
propagate this information to future class instances to be consumed by the
`eval` and `terminate` methods.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]