peter-toth commented on code in PR #56244:
URL: https://github.com/apache/spark/pull/56244#discussion_r3499453511


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala:
##########
@@ -564,6 +564,201 @@ object BooleanSimplification extends Rule[LogicalPlan] 
with PredicateHelper {
 }
 
 
+/**
+ * Decomposes struct-level equality comparisons appearing in Filter conditions 
into a
+ * conjunction of field-level equalities. This enables per-field filter 
pushdown to data
+ * sources (Parquet row group skipping, Iceberg/Delta column statistics, 
partition pruning).
+ *
+ * For a non-nullable struct comparison `struct_col = struct(1, 'a')`, the 
rewrite produces
+ * `struct_col.field1 = 1 AND struct_col.field2 = 'a'`. When either operand is 
nullable,
+ * the conjunction is wrapped in a null-check expression that mirrors the 
original
+ * struct-level comparison's NULL semantics (see comments on 
`decomposeEqualTo` and
+ * `decomposeEqualNullSafe`).
+ *
+ * Scope: the rule only rewrites comparisons appearing inside `Filter` 
conditions. It
+ * deliberately does NOT rewrite:
+ *   - Join conditions: an equi-join key on a struct is matched by the planner 
as a single
+ *     equality and routes to BroadcastHashJoin / SortMergeJoin. Decomposing 
it would
+ *     break key matching for those join strategies.
+ *   - Aggregate grouping expressions: structurally cannot be reached because 
the
+ *     transformation is scoped to `case Filter`.
+ *   - Project expressions: outside the pushdown path; rewriting them does not 
enable
+ *     pushdown and would expand the projection unnecessarily.
+ *
+ * The rewrite is gated on 
`spark.sql.optimizer.decomposeStructComparison.enabled`
+ * (default off) so users can opt in once the behavior has soaked in their 
workloads.
+ * The total number of decomposed predicates per top-level comparison is 
bounded by
+ * `spark.sql.optimizer.decomposeStructComparison.maxFields` to prevent 
runaway expansion
+ * on deeply nested or wide structs.
+ */
+object DecomposeStructComparison extends Rule[LogicalPlan] with 
PredicateHelper {
+  def apply(plan: LogicalPlan): LogicalPlan = {
+    if (!conf.decomposeStructComparisonEnabled) {
+      return plan
+    }
+    plan.transformWithPruning(_.containsPattern(FILTER), ruleId) {
+      case f @ Filter(condition, _) =>
+        // A Filter drops rows whose condition is not TRUE, so at a top-level 
conjunct
+        // NULL and FALSE are indistinguishable. That lets us emit the bare, 
*pushable*
+        // field conjunction for a struct comparison sitting directly in the 
WHERE,
+        // instead of the null-preserving `If(...)` form (which is opaque to 
filter
+        // pushdown). Comparisons nested under Not/Or/etc. are NOT in filter 
position --
+        // there NULL vs FALSE is observable -- so those go through the 
null-preserving
+        // `decomposeCondition` path unchanged.
+        val rewritten = splitConjunctivePredicates(condition).map {
+          case EqualTo(l, r) if canDecompose(l, r) && 
filterEquivalentToConjunction(l, r) =>
+            // Guard: AND IsNotNull(<nullable operand>) to preserve whole-null 
vs
+            // all-null-fields semantics. Without it, a whole-null struct row 
would
+            // pass the fieldConjunction when the literal has all-null fields.
+            val nullGuards = (if (l.nullable) Seq(IsNotNull(l)) else Nil) ++
+              (if (r.nullable) Seq(IsNotNull(r)) else Nil)
+            val conjunction = fieldConjunction(l, r)
+            if (nullGuards.isEmpty) conjunction
+            else (nullGuards :+ conjunction).reduce(And)
+          case EqualNullSafe(l, r) if canDecompose(l, r) =>

Review Comment:
   Nit: These two arms (the `EqualTo` case above and this `EqualNullSafe` case) 
now have byte-identical bodies -- in Filter position `s = lit` and `s <=> lit` 
decompose to the same `IsNotNull-guards AND fieldConjunction`. They could fold 
into one:
   
   ```scala
   case EqualTo(l, r) | EqualNullSafe(l, r)
       if canDecompose(l, r) && filterEquivalentToConjunction(l, r) =>
     val nullGuards = (if (l.nullable) Seq(IsNotNull(l)) else Nil) ++
       (if (r.nullable) Seq(IsNotNull(r)) else Nil)
     val conjunction = fieldConjunction(l, r)
     if (nullGuards.isEmpty) conjunction else (nullGuards :+ 
conjunction).reduce(And)
   ```
   
   Beyond de-duplication, this closes a small asymmetry: the `EqualTo` arm is 
structurally guarded by `filterEquivalentToConjunction` (a both-nullable pair 
falls through to the null-preserving `decomposeEqualTo`), but this `<=>` arm 
has no such guard -- its correctness for a both-nullable pair relies on 
`NullPropagation` (ordered before this rule) having already rewritten `<=> 
null` to `IsNull`. I verified that holds today (`s <=> CAST(NULL AS 
struct<...>)` optimizes to `isnull(s)`, and both-nullable columns are rejected 
by `canDecompose`), so this is hardening, not a bugfix -- but routing both arms 
through the same guard makes the `<=>` path self-evidently correct without 
depending on batch ordering. (A both-nullable `<=>` correctly falls through to 
`decomposeEqualNullSafe`, which already handles it.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to