Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/6712#discussion_r32089187
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala ---
    @@ -71,43 +71,49 @@ private[spark] case class PythonUDF(
     private[spark] object ExtractPythonUdfs extends Rule[LogicalPlan] {
       def apply(plan: LogicalPlan): LogicalPlan = plan transform {
         // Skip EvaluatePython nodes.
    -    case p: EvaluatePython => p
    +    case plan: EvaluatePython => plan
     
    -    case l: LogicalPlan =>
    +    case plan: LogicalPlan =>
           // Extract any PythonUDFs from the current operator.
    -      val udfs = l.expressions.flatMap(_.collect { case udf: PythonUDF => 
udf})
    +      val udfs = plan.expressions.flatMap(_.collect { case udf: PythonUDF 
=> udf })
           if (udfs.isEmpty) {
             // If there aren't any, we are done.
    -        l
    +        plan
           } else {
             // Pick the UDF we are going to evaluate (TODO: Support evaluating 
multiple UDFs at a time)
             // If there is more than one, we will add another evaluation 
operator in a subsequent pass.
    -        val udf = udfs.head
    -
    -        var evaluation: EvaluatePython = null
    -
    -        // Rewrite the child that has the input required for the UDF
    -        val newChildren = l.children.map { child =>
    -          // Check to make sure that the UDF can be evaluated with only 
the input of this child.
    -          // Other cases are disallowed as they are ambiguous or would 
require a cartisian product.
    -          if (udf.references.subsetOf(child.outputSet)) {
    -            evaluation = EvaluatePython(udf, child)
    -            evaluation
    -          } else if (udf.references.intersect(child.outputSet).nonEmpty) {
    -            sys.error(s"Invalid PythonUDF $udf, requires attributes from 
more than one child.")
    -          } else {
    -            child
    -          }
    +        udfs.find(_.resolved) match {
    +          case Some(udf) =>
    +            var evaluation: EvaluatePython = null
    +
    +            // Rewrite the child that has the input required for the UDF
    +            val newChildren = plan.children.map { child =>
    +              // Check to make sure that the UDF can be evaluated with 
only the input of this child.
    +              // Other cases are disallowed as they are ambiguous or would 
require a cartesian
    +              // product.
    +              if (udf.references.subsetOf(child.outputSet)) {
    +                evaluation = EvaluatePython(udf, child)
    +                evaluation
    +              } else if 
(udf.references.intersect(child.outputSet).nonEmpty) {
    +                sys.error(s"Invalid PythonUDF $udf, requires attributes 
from more than one child.")
    +              } else {
    +                child
    +              }
    +            }
    +
    +            assert(evaluation != null, "Unable to evaluate PythonUDF.  
Missing input attributes.")
    +
    +            // Trim away the new UDF value if it was only used for 
filtering or something.
    +            logical.Project(
    +              plan.output,
    +              plan.transformExpressions {
    +                case p: PythonUDF if p.fastEquals(udf) => 
evaluation.resultAttribute
    +              }.withNewChildren(newChildren))
    +
    +          case None =>
    +            // If there is no Python UDF that is resolved, skip this round.
    --- End diff --
    
    this is required because of the change in function resolution (now we can 
have inputs to udfs that are not yet resolved)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to