yyanyy commented on code in PR #56244:
URL: https://github.com/apache/spark/pull/56244#discussion_r3359726511
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala:
##########
@@ -564,6 +564,46 @@ object BooleanSimplification extends Rule[LogicalPlan]
with PredicateHelper {
}
+/**
+ * Decomposes struct-level equality comparisons into conjunctions of
field-level equalities.
+ * This enables filter pushdown for individual struct fields.
+ * For example, `struct_col = struct(1, 'a')` becomes
+ * `struct_col.field1 = 1 AND struct_col.field2 = 'a'`.
+ */
+object DecomposeStructComparison extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
Review Comment:
I'm a little worried that combining this 1/ recursive transform of nested
struct and 2/ unconditionally AND'ing all field together within struct
regardless of how many fields the struct hold, together could cause stack
overflow error for deeply nested and/or huge struct, and worst case pose DOS
threat to the hosting system. Do we think if we should guard towards these by
setting an upper limit, and/or in general have this feature behind a config
flag to guard against this and the below issue?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala:
##########
@@ -564,6 +564,46 @@ object BooleanSimplification extends Rule[LogicalPlan]
with PredicateHelper {
}
+/**
+ * Decomposes struct-level equality comparisons into conjunctions of
field-level equalities.
+ * This enables filter pushdown for individual struct fields.
+ * For example, `struct_col = struct(1, 'a')` becomes
+ * `struct_col.field1 = 1 AND struct_col.field2 = 'a'`.
+ */
+object DecomposeStructComparison extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
+ _.containsPattern(FILTER), ruleId) {
+ case f @ Filter(condition, _) =>
+ f.copy(condition = decomposeCondition(condition))
+ }
+
+ private def decomposeCondition(expr: Expression): Expression =
expr.transformWithPruning(
+ _.containsPattern(BINARY_COMPARISON)) {
+ case EqualTo(left, right) if canDecompose(left, right) =>
+ decompose(left, right, EqualTo)
+ case EqualNullSafe(left, right) if canDecompose(left, right) =>
+ decompose(left, right, EqualNullSafe)
+ }
+
+ private def canDecompose(left: Expression, right: Expression): Boolean = {
+ (left.dataType, right.dataType) match {
+ case (l: StructType, r: StructType) =>
+ l.length > 0 && l.length == r.length && left.deterministic &&
right.deterministic
+ case _ => false
+ }
+ }
+
+ private def decompose(
+ left: Expression,
+ right: Expression,
+ cmp: (Expression, Expression) => Expression): Expression = {
+ val fields = left.dataType.asInstanceOf[StructType].fields
+ fields.indices.map { i =>
+ cmp(GetStructField(left, i), GetStructField(right, i))
Review Comment:
Apparently there's a nuance case in spark - while `1 = NULL` returns null,
`struct(1, null) = struct(1, null)` returns true; with this code change, this
behavior is lost. I think we will need to be careful in null handling in general
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]