cloud-fan commented on code in PR #49471:
URL: https://github.com/apache/spark/pull/49471#discussion_r1928172054
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2636,6 +2637,93 @@ class Analyzer(override val catalogManager:
CatalogManager) extends RuleExecutor
}
}
+ /*
+ * This rule resolves SQL table functions.
+ */
+ object ResolveSQLTableFunctions extends Rule[LogicalPlan] with AliasHelper {
+
+ /**
+ * Check if a subquery plan is subject to the COUNT bug that can cause
wrong results.
+ * A lateral correlation can only be removed if the lateral subquery is
not subject to
+ * the COUNT bug. Currently only lateral correlation can handle it
correctly.
+ */
+ private def hasCountBug(sub: LogicalPlan): Boolean = sub.find {
+ // The COUNT bug occurs when there is an Aggregate that satisfies all
the following
+ // conditions:
+ // 1) is on the correlation path
+ // 2) has non-empty group by expressions
+ // 3) has one or more output columns that evaluate to non-null values
with empty input.
+ // E.g: COUNT(empty row) = 0.
+ // For simplicity, we use a stricter criteria (1 and 2 only) to
determine if a query
+ // is subject to the COUNT bug.
+ case a: Aggregate if a.groupingExpressions.nonEmpty =>
hasOuterReferences(a.child)
+ case _ => false
+ }.nonEmpty
+
+ /**
+ * Rewrite a resolved SQL table function plan by removing unnecessary
lateral joins:
+ * Before:
+ * LateralJoin lateral-subquery [a], Inner
+ * : +- Project [c1, c2]
+ * : +- Filter [outer(a) == c1]
+ * : +- Relation [c1, c2]
+ * +- Project [1 AS a]
+ * +- OneRowRelation
+ * After:
+ * Project [c1, c2]
+ * +- Filter [1 == c1] <---- Replaced outer(a)
+ * +- Relation [c1, c2]
+ */
+ private def rewrite(plan: LogicalPlan): LogicalPlan = {
+ (plan transformUp {
+ case j @ LateralJoin(Project(aliases, _: OneRowRelation), sub:
LateralSubquery, Inner, None)
+ if j.resolved && aliases.forall(_.deterministic) =>
+ val attrMap = AttributeMap(aliases.collect { case a: Alias =>
a.toAttribute -> a.child })
+ val newPlan = sub.plan.transformAllExpressionsWithPruning(
+ _.containsPattern(OUTER_REFERENCE)) {
+ // Avoid replacing outer references that do not belong to the
current outer plan.
+ // This can happen if the child of an alias also contains outer
references (nested
+ // table function references). E.g:
+ // LateralJoin
+ // : +- Filter [outer(a) == x]
+ // : +- Relation [x, y]
+ // +- Project [outer(c) AS a]
+ // +- OneRowRelation
+ case OuterReference(a: Attribute) if attrMap.contains(a) =>
attrMap(a) match {
+ case ne: NamedExpression => ne
+ case o => Alias(o, a.name)(exprId = a.exprId, qualifier =
a.qualifier)
Review Comment:
nvm, this is the inner alias, and we don't know if we can omit it or not
here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]