Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/6712#discussion_r32092072
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala ---
    @@ -71,43 +71,49 @@ private[spark] case class PythonUDF(
     private[spark] object ExtractPythonUdfs extends Rule[LogicalPlan] {
       def apply(plan: LogicalPlan): LogicalPlan = plan transform {
         // Skip EvaluatePython nodes.
    -    case p: EvaluatePython => p
    +    case plan: EvaluatePython => plan
     
    -    case l: LogicalPlan =>
    +    case plan: LogicalPlan =>
           // Extract any PythonUDFs from the current operator.
    -      val udfs = l.expressions.flatMap(_.collect { case udf: PythonUDF => 
udf})
    +      val udfs = plan.expressions.flatMap(_.collect { case udf: PythonUDF 
=> udf })
           if (udfs.isEmpty) {
             // If there aren't any, we are done.
    -        l
    +        plan
           } else {
             // Pick the UDF we are going to evaluate (TODO: Support evaluating 
multiple UDFs at a time)
             // If there is more than one, we will add another evaluation 
operator in a subsequent pass.
    -        val udf = udfs.head
    -
    -        var evaluation: EvaluatePython = null
    -
    -        // Rewrite the child that has the input required for the UDF
    -        val newChildren = l.children.map { child =>
    -          // Check to make sure that the UDF can be evaluated with only 
the input of this child.
    -          // Other cases are disallowed as they are ambiguous or would 
require a cartisian product.
    -          if (udf.references.subsetOf(child.outputSet)) {
    -            evaluation = EvaluatePython(udf, child)
    -            evaluation
    -          } else if (udf.references.intersect(child.outputSet).nonEmpty) {
    -            sys.error(s"Invalid PythonUDF $udf, requires attributes from 
more than one child.")
    -          } else {
    -            child
    -          }
    +        udfs.find(_.resolved) match {
    +          case Some(udf) =>
    +            var evaluation: EvaluatePython = null
    +
    +            // Rewrite the child that has the input required for the UDF
    +            val newChildren = plan.children.map { child =>
    +              // Check to make sure that the UDF can be evaluated with 
only the input of this child.
    +              // Other cases are disallowed as they are ambiguous or would 
require a cartesian
    +              // product.
    +              if (udf.references.subsetOf(child.outputSet)) {
    +                evaluation = EvaluatePython(udf, child)
    +                evaluation
    +              } else if 
(udf.references.intersect(child.outputSet).nonEmpty) {
    +                sys.error(s"Invalid PythonUDF $udf, requires attributes 
from more than one child.")
    --- End diff --
    
    Prefer `AnalysisException` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to