yyanyy commented on code in PR #56244:
URL: https://github.com/apache/spark/pull/56244#discussion_r3359726511


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala:
##########
@@ -564,6 +564,46 @@ object BooleanSimplification extends Rule[LogicalPlan] 
with PredicateHelper {
 }
 
 
+/**
+ * Decomposes struct-level equality comparisons into conjunctions of 
field-level equalities.
+ * This enables filter pushdown for individual struct fields.
+ * For example, `struct_col = struct(1, 'a')` becomes
+ * `struct_col.field1 = 1 AND struct_col.field2 = 'a'`.
+ */
+object DecomposeStructComparison extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(

Review Comment:
   I'm a little worried that combining this 1/ recursive transform of nested 
struct and 2/ unconditionally AND'ing all field together within struct 
regardless of how many fields the struct hold, together could cause stack 
overflow error for deeply nested and/or huge struct, and worst case pose DOS 
threat to the hosting system. Do we think if we should guard towards these by 
setting an upper limit, and/or in general have this feature behind a config 
flag to guard against this and the below issue?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala:
##########
@@ -564,6 +564,46 @@ object BooleanSimplification extends Rule[LogicalPlan] 
with PredicateHelper {
 }
 
 
+/**
+ * Decomposes struct-level equality comparisons into conjunctions of 
field-level equalities.
+ * This enables filter pushdown for individual struct fields.
+ * For example, `struct_col = struct(1, 'a')` becomes
+ * `struct_col.field1 = 1 AND struct_col.field2 = 'a'`.
+ */
+object DecomposeStructComparison extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
+    _.containsPattern(FILTER), ruleId) {
+    case f @ Filter(condition, _) =>
+      f.copy(condition = decomposeCondition(condition))
+  }
+
+  private def decomposeCondition(expr: Expression): Expression = 
expr.transformWithPruning(
+    _.containsPattern(BINARY_COMPARISON)) {
+    case EqualTo(left, right) if canDecompose(left, right) =>
+      decompose(left, right, EqualTo)
+    case EqualNullSafe(left, right) if canDecompose(left, right) =>
+      decompose(left, right, EqualNullSafe)
+  }
+
+  private def canDecompose(left: Expression, right: Expression): Boolean = {
+    (left.dataType, right.dataType) match {
+      case (l: StructType, r: StructType) =>
+        l.length > 0 && l.length == r.length && left.deterministic && 
right.deterministic
+      case _ => false
+    }
+  }
+
+  private def decompose(
+      left: Expression,
+      right: Expression,
+      cmp: (Expression, Expression) => Expression): Expression = {
+    val fields = left.dataType.asInstanceOf[StructType].fields
+    fields.indices.map { i =>
+      cmp(GetStructField(left, i), GetStructField(right, i))

Review Comment:
   Apparently there's a nuance case in spark - while `1 = NULL` returns null, 
`struct(1, null) = struct(1, null)` returns true; with this code change, this 
behavior is lost. I think we will need to be careful in null handling in general



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to