cloud-fan commented on a change in pull request #32082:
URL: https://github.com/apache/spark/pull/32082#discussion_r621219144
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -2095,9 +2104,101 @@ class Analyzer(override val catalogManager:
CatalogManager)
case other =>
other
}
+ }
+
+ case u @ UnresolvedFunction(nameParts, arguments, isDistinct,
filter, ignoreNulls) =>
+ withPosition(u) {
+ expandIdentifier(nameParts) match {
+ case NonSessionCatalogAndIdentifier(catalog, ident) =>
+ if (!catalog.isFunctionCatalog) {
+ throw new AnalysisException(s"Trying to lookup function
'$ident' in " +
+ s"catalog '${catalog.name()}', but it is not a
FunctionCatalog.")
+ }
+
+ val unbound = catalog.asFunctionCatalog.loadFunction(ident)
+ val inputType = StructType(arguments.zipWithIndex.map {
+ case (exp, pos) => StructField(s"_$pos", exp.dataType,
exp.nullable)
+ })
+ val bound = try {
+ unbound.bind(inputType)
+ } catch {
+ case unsupported: UnsupportedOperationException =>
+ throw new AnalysisException(s"Function '${unbound.name}'
cannot process " +
+ s"input:
(${arguments.map(_.dataType.simpleString).mkString(", ")}): " +
+ unsupported.getMessage, cause = Some(unsupported))
+ }
+
+ bound match {
+ case scalarFunc: ScalarFunction[_] =>
+ if (isDistinct) {
+ throw
QueryCompilationErrors.functionWithUnsupportedSyntaxError(
+ scalarFunc.name(), "DISTINCT")
+ } else if (filter.isDefined) {
+ throw
QueryCompilationErrors.functionWithUnsupportedSyntaxError(
+ scalarFunc.name(), "FILTER clause")
+ } else if (ignoreNulls) {
+ throw
QueryCompilationErrors.functionWithUnsupportedSyntaxError(
+ scalarFunc.name(), "IGNORE NULLS")
+ } else {
+ // TODO: implement type coercion by looking at input
type from the UDF. We
+ // may also want to check if the parameter types from
the magic method
+ // match the input type through
`BoundFunction.inputTypes`.
+ val argClasses = inputType.fields.map(_.dataType)
+ findMethod(scalarFunc, MAGIC_METHOD_NAME, argClasses)
match {
+ case Some(_) =>
+ val caller = Literal.create(scalarFunc,
ObjectType(scalarFunc.getClass))
+ Invoke(caller, MAGIC_METHOD_NAME,
scalarFunc.resultType(),
+ arguments, returnNullable =
scalarFunc.isResultNullable)
+ case _ =>
+ // TODO: handle functions defined in Scala too -
in Scala, even if a
+ // subclass do not override the default method in
parent interface
+ // defined in Java, the method can still be found
from
+ // `getDeclaredMethod`.
+ // since `inputType` is a `StructType`, it is
mapped to a `InternalRow`
+ // which we can use to lookup the `produceResult`
method.
+ findMethod(scalarFunc, "produceResult",
Seq(inputType)) match {
+ case Some(_) =>
+ ApplyFunctionExpression(scalarFunc, arguments)
+ case None =>
+ failAnalysis(s"ScalarFunction
'${bound.name()}' neither implement" +
+ s" magic method nor override
'produceResult'")
+ }
+ }
+ }
+ case aggFunc: V2AggregateFunction[_, _] =>
Review comment:
ditto, put into a new method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]