AngersZhuuuu commented on a change in pull request #30212:
URL: https://github.com/apache/spark/pull/30212#discussion_r533152103
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -2027,65 +1952,76 @@ class Analyzer(override val catalogManager:
CatalogManager)
*/
object ResolveFunctions extends Rule[LogicalPlan] {
val trimWarningEnabled = new AtomicBoolean(true)
+
+ def resolveFunction(): PartialFunction[Expression, Expression] = {
+ case u if !u.childrenResolved => u // Skip until children are resolved.
+ case u: UnresolvedAttribute if resolver(u.name,
VirtualColumn.hiveGroupingIdName) =>
+ withPosition(u) {
+ Alias(GroupingID(Nil), VirtualColumn.hiveGroupingIdName)()
+ }
+ case u @ UnresolvedGenerator(name, children) =>
+ withPosition(u) {
+ v1SessionCatalog.lookupFunction(name, children) match {
+ case generator: Generator => generator
+ case other =>
+ failAnalysis(s"$name is expected to be a generator. However, " +
+ s"its class is ${other.getClass.getCanonicalName}, which is
not a generator.")
+ }
+ }
+ case u @ UnresolvedFunction(funcId, arguments, isDistinct, filter) =>
+ withPosition(u) {
+ v1SessionCatalog.lookupFunction(funcId, arguments) match {
+ // AggregateWindowFunctions are AggregateFunctions that can only
be evaluated within
+ // the context of a Window clause. They do not need to be wrapped
in an
+ // AggregateExpression.
+ case wf: AggregateWindowFunction =>
+ if (isDistinct || filter.isDefined) {
+ failAnalysis("DISTINCT or FILTER specified, " +
+ s"but ${wf.prettyName} is not an aggregate function")
+ } else {
+ wf
+ }
+ // We get an aggregate function, we need to wrap it in an
AggregateExpression.
+ case agg: AggregateFunction =>
+ if (filter.isDefined && !filter.get.deterministic) {
+ failAnalysis("FILTER expression is non-deterministic, " +
+ "it cannot be used in aggregate functions")
+ }
+ AggregateExpression(agg, Complete, isDistinct, filter)
+ // This function is not an aggregate function, just return the
resolved one.
+ case other if (isDistinct || filter.isDefined) =>
+ failAnalysis("DISTINCT or FILTER specified, " +
+ s"but ${other.prettyName} is not an aggregate function")
+ case e: String2TrimExpression if arguments.size == 2 =>
+ if (trimWarningEnabled.get) {
+ logWarning("Two-parameter TRIM/LTRIM/RTRIM function signatures
are deprecated." +
+ " Use SQL syntax `TRIM((BOTH | LEADING | TRAILING)? trimStr
FROM str)`" +
+ " instead.")
+ trimWarningEnabled.set(false)
+ }
+ e
+ case other =>
+ other
+ }
+ }
+ }
+
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
// Resolve functions with concrete relations from v2 catalog.
case UnresolvedFunc(multipartIdent) =>
val funcIdent = parseSessionCatalogFunctionIdentifier(multipartIdent)
ResolvedFunc(Identifier.of(funcIdent.database.toArray,
funcIdent.funcName))
- case q: LogicalPlan =>
- q transformExpressions {
- case u if !u.childrenResolved => u // Skip until children are
resolved.
- case u: UnresolvedAttribute if resolver(u.name,
VirtualColumn.hiveGroupingIdName) =>
- withPosition(u) {
- Alias(GroupingID(Nil), VirtualColumn.hiveGroupingIdName)()
- }
- case u @ UnresolvedGenerator(name, children) =>
- withPosition(u) {
- v1SessionCatalog.lookupFunction(name, children) match {
- case generator: Generator => generator
- case other =>
- failAnalysis(s"$name is expected to be a generator. However,
" +
- s"its class is ${other.getClass.getCanonicalName}, which
is not a generator.")
- }
- }
- case u @ UnresolvedFunction(funcId, arguments, isDistinct, filter) =>
- withPosition(u) {
- v1SessionCatalog.lookupFunction(funcId, arguments) match {
- // AggregateWindowFunctions are AggregateFunctions that can
only be evaluated within
- // the context of a Window clause. They do not need to be
wrapped in an
- // AggregateExpression.
- case wf: AggregateWindowFunction =>
- if (isDistinct || filter.isDefined) {
- failAnalysis("DISTINCT or FILTER specified, " +
- s"but ${wf.prettyName} is not an aggregate function")
- } else {
- wf
- }
- // We get an aggregate function, we need to wrap it in an
AggregateExpression.
- case agg: AggregateFunction =>
- if (filter.isDefined && !filter.get.deterministic) {
- failAnalysis("FILTER expression is non-deterministic, " +
- "it cannot be used in aggregate functions")
- }
- AggregateExpression(agg, Complete, isDistinct, filter)
- // This function is not an aggregate function, just return the
resolved one.
- case other if (isDistinct || filter.isDefined) =>
- failAnalysis("DISTINCT or FILTER specified, " +
- s"but ${other.prettyName} is not an aggregate function")
- case e: String2TrimExpression if arguments.size == 2 =>
- if (trimWarningEnabled.get) {
- log.warn("Two-parameter TRIM/LTRIM/RTRIM function
signatures are deprecated." +
- " Use SQL syntax `TRIM((BOTH | LEADING | TRAILING)?
trimStr FROM str)`" +
- " instead.")
- trimWarningEnabled.set(false)
- }
- e
- case other =>
- other
- }
- }
+ case a: Aggregate =>
+ val newGroups = a.groupingExpressions.map {
+ case gs: GroupingSet =>
+
gs.withNewChildren(gs.children.map(_.transformDown(resolveFunction)))
+ case e => e
}
+ a.copy(groupingExpressions = newGroups) transformExpressions
resolveFunction
Review comment:
> I think its better not to make the existing Analyzer logics
complicated where possible. Handling all the case in ` case e =>
resolveFunction(e)` looks the best.
How about current? I have no ideal about how to integrate `resolveExpr` and
`reslveFunction`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]