HyukjinKwon commented on code in PR #50867:
URL: https://github.com/apache/spark/pull/50867#discussion_r2125391970


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala:
##########
@@ -169,24 +169,64 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with 
Logging {
     e.exists(PythonUDF.isScalarPythonUDF)
   }
 
+  /**
+   * Return true if we should extract the current expression, including all of 
its current
+   * children (including UDF expression, and all others), to a logical node.
+   * The children of the expression can be udf expressions, this would be 
`chaining`.
+   * If child udf expressions were already extracted before, then this will 
just extract
+   * the current udf expression, so they will end up in separate logical 
nodes. The child
+   * expressions will have been transformed to Attribute expressions 
referencing the child plan
+   * node's output.
+   *
+   * Return false if there is no single continuous chain of UDFs that can be 
extracted:
+   * - if there are other expression in-between, e.g., foo(1 + bar(baz())), 
return false. The
+   *   caller will have to extract bar(baz()) separately first.
+   * - if the eval types of the udf expressions in the chain differ, return 
false.
+   * - if a UDF has more than one child, e.g. foo(bar(), baz()), return false
+   * If we return false here, the expectation is that the recursive calls of
+   * collectEvaluableUDFsFromExpressions will then visit the children and 
extract them first to
+   * separate nodes.
+   */
   @scala.annotation.tailrec
-  private def canEvaluateInPython(e: PythonUDF): Boolean = {
+  private def shouldExtractUDFExpressionTree(e: PythonUDF): Boolean = {
     e.children match {
-      // single PythonUDF child could be chained and evaluated in Python
-      case Seq(u: PythonUDF) => correctEvalType(e) == correctEvalType(u) && 
canEvaluateInPython(u)
+      case Seq(child: PythonUDF) => correctEvalType(e) == 
correctEvalType(child) &&
+        shouldExtractUDFExpressionTree(child)
       // Python UDF can't be evaluated directly in JVM
       case children => !children.exists(hasScalarPythonUDF)
     }
   }
 
+  /**
+   * We use the following terminology:
+   * - fusing is the act of combining multiple UDFs into a single logical 
node. This can be
+   *   accomplished in different cases:
+   *   - if the UDFs are siblings, e.g., foo(x), bar(x) - we refer to this as 
parallel fusing,
+   *     where multiple independent UDFs are evaluated together over the same 
input.
+   *   - if the UDFs are nested, e.g., foo(bar(...)) - we refer to this as 
chained fusing
+   *     or chaining, where the output of one UDF feeds into the next in a 
sequential pipeline.
+   *
+   * collectEvaluableUDFsFromExpressions returns a list of UDF expressions 
that can be planned
+   * together into one plan node. collectEvaluableUDFsFromExpressions will be 
called multiple times
+   * by recursive calls of extract(plan), until no more evaluable UDFs are 
found.
+   *
+   * As an example, consider the following expression tree:
+   * udf1(udf2(udf3(x)), udf4(x))), where all UDFs are PythonUDFs of the same 
evaltype.
+   * We can only fuse UDFs of the same eval type, and never UDFs of 
SQL_SCALAR_PANDAS_ITER_UDF.
+   * The following udf expressions will be returned:
+   * - First, we will return Seq(udf3, udf4), as these two UDFs must be 
evaluated first.
+   *   We return both in one Seq, as it is possible to do parallel fusing for 
udf3 an udf4.
+   * - As we can only chain UDFs with exactly one child, we will not fuse udf2 
with its children.
+   *   But we can chain udf1 and udf2, so a later call to 
collectEvaluableUDFsFromExpressions will
+   *   return Seq(udf1, udf2).
+   */
   private def collectEvaluableUDFsFromExpressions(expressions: 
Seq[Expression]): Seq[PythonUDF] = {
     // If first UDF is SQL_SCALAR_PANDAS_ITER_UDF, then only return this UDF,
-    // otherwise check if subsequent UDFs are of the same type as the first 
UDF. (since we can only
-    // extract UDFs of the same eval type)
+    // otherwise check if subsequent UDFs are of the same type as the first 
UDF.
 
     var firstVisitedScalarUDFEvalType: Option[Int] = None
 
-    def canChainUDF(evalType: Int): Boolean = {
+    def canFuseWithParallelUDFs(evalType: Int): Boolean = {

Review Comment:
   I think the term `chain` makes more sense because they are chained in Python 
workers when they are executed. Fuse sounds more like merging multiple 
functions into one function



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to