cloud-fan commented on a change in pull request #32082:
URL: https://github.com/apache/spark/pull/32082#discussion_r621219144



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -2095,9 +2104,101 @@ class Analyzer(override val catalogManager: 
CatalogManager)
                 case other =>
                   other
               }
+          }
+
+          case u @ UnresolvedFunction(nameParts, arguments, isDistinct, 
filter, ignoreNulls) =>
+            withPosition(u) {
+              expandIdentifier(nameParts) match {
+                case NonSessionCatalogAndIdentifier(catalog, ident) =>
+                  if (!catalog.isFunctionCatalog) {
+                    throw new AnalysisException(s"Trying to lookup function 
'$ident' in " +
+                      s"catalog '${catalog.name()}', but it is not a 
FunctionCatalog.")
+                  }
+
+                  val unbound = catalog.asFunctionCatalog.loadFunction(ident)
+                  val inputType = StructType(arguments.zipWithIndex.map {
+                    case (exp, pos) => StructField(s"_$pos", exp.dataType, 
exp.nullable)
+                  })
+                  val bound = try {
+                    unbound.bind(inputType)
+                  } catch {
+                    case unsupported: UnsupportedOperationException =>
+                      throw new AnalysisException(s"Function '${unbound.name}' 
cannot process " +
+                        s"input: 
(${arguments.map(_.dataType.simpleString).mkString(", ")}): " +
+                        unsupported.getMessage, cause = Some(unsupported))
+                  }
+
+                  bound match {
+                    case scalarFunc: ScalarFunction[_] =>
+                      if (isDistinct) {
+                        throw 
QueryCompilationErrors.functionWithUnsupportedSyntaxError(
+                          scalarFunc.name(), "DISTINCT")
+                      } else if (filter.isDefined) {
+                        throw 
QueryCompilationErrors.functionWithUnsupportedSyntaxError(
+                          scalarFunc.name(), "FILTER clause")
+                      } else if (ignoreNulls) {
+                        throw 
QueryCompilationErrors.functionWithUnsupportedSyntaxError(
+                          scalarFunc.name(), "IGNORE NULLS")
+                      } else {
+                        // TODO: implement type coercion by looking at input 
type from the UDF. We
+                        //  may also want to check if the parameter types from 
the magic method
+                        //  match the input type through 
`BoundFunction.inputTypes`.
+                        val argClasses = inputType.fields.map(_.dataType)
+                        findMethod(scalarFunc, MAGIC_METHOD_NAME, argClasses) 
match {
+                          case Some(_) =>
+                            val caller = Literal.create(scalarFunc, 
ObjectType(scalarFunc.getClass))
+                            Invoke(caller, MAGIC_METHOD_NAME, 
scalarFunc.resultType(),
+                              arguments, returnNullable = 
scalarFunc.isResultNullable)
+                          case _ =>
+                            // TODO: handle functions defined in Scala too - 
in Scala, even if a
+                            //  subclass do not override the default method in 
parent interface
+                            //  defined in Java, the method can still be found 
from
+                            //  `getDeclaredMethod`.
+                            // since `inputType` is a `StructType`, it is 
mapped to a `InternalRow`
+                            // which we can use to lookup the `produceResult` 
method.
+                            findMethod(scalarFunc, "produceResult", 
Seq(inputType)) match {
+                              case Some(_) =>
+                                ApplyFunctionExpression(scalarFunc, arguments)
+                              case None =>
+                                failAnalysis(s"ScalarFunction 
'${bound.name()}' neither implement" +
+                                  s" magic method nor override 
'produceResult'")
+                            }
+                        }
+                      }
+                    case aggFunc: V2AggregateFunction[_, _] =>

Review comment:
       ditto, put into a new method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to