peter-toth commented on code in PR #56244:
URL: https://github.com/apache/spark/pull/56244#discussion_r3499453511
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala:
##########
@@ -564,6 +564,201 @@ object BooleanSimplification extends Rule[LogicalPlan]
with PredicateHelper {
}
+/**
+ * Decomposes struct-level equality comparisons appearing in Filter conditions
into a
+ * conjunction of field-level equalities. This enables per-field filter
pushdown to data
+ * sources (Parquet row group skipping, Iceberg/Delta column statistics,
partition pruning).
+ *
+ * For a non-nullable struct comparison `struct_col = struct(1, 'a')`, the
rewrite produces
+ * `struct_col.field1 = 1 AND struct_col.field2 = 'a'`. When either operand is
nullable,
+ * the conjunction is wrapped in a null-check expression that mirrors the
original
+ * struct-level comparison's NULL semantics (see comments on
`decomposeEqualTo` and
+ * `decomposeEqualNullSafe`).
+ *
+ * Scope: the rule only rewrites comparisons appearing inside `Filter`
conditions. It
+ * deliberately does NOT rewrite:
+ * - Join conditions: an equi-join key on a struct is matched by the planner
as a single
+ * equality and routes to BroadcastHashJoin / SortMergeJoin. Decomposing
it would
+ * break key matching for those join strategies.
+ * - Aggregate grouping expressions: structurally cannot be reached because
the
+ * transformation is scoped to `case Filter`.
+ * - Project expressions: outside the pushdown path; rewriting them does not
enable
+ * pushdown and would expand the projection unnecessarily.
+ *
+ * The rewrite is gated on
`spark.sql.optimizer.decomposeStructComparison.enabled`
+ * (default off) so users can opt in once the behavior has soaked in their
workloads.
+ * The total number of decomposed predicates per top-level comparison is
bounded by
+ * `spark.sql.optimizer.decomposeStructComparison.maxFields` to prevent
runaway expansion
+ * on deeply nested or wide structs.
+ */
+object DecomposeStructComparison extends Rule[LogicalPlan] with
PredicateHelper {
+ def apply(plan: LogicalPlan): LogicalPlan = {
+ if (!conf.decomposeStructComparisonEnabled) {
+ return plan
+ }
+ plan.transformWithPruning(_.containsPattern(FILTER), ruleId) {
+ case f @ Filter(condition, _) =>
+ // A Filter drops rows whose condition is not TRUE, so at a top-level
conjunct
+ // NULL and FALSE are indistinguishable. That lets us emit the bare,
*pushable*
+ // field conjunction for a struct comparison sitting directly in the
WHERE,
+ // instead of the null-preserving `If(...)` form (which is opaque to
filter
+ // pushdown). Comparisons nested under Not/Or/etc. are NOT in filter
position --
+ // there NULL vs FALSE is observable -- so those go through the
null-preserving
+ // `decomposeCondition` path unchanged.
+ val rewritten = splitConjunctivePredicates(condition).map {
+ case EqualTo(l, r) if canDecompose(l, r) &&
filterEquivalentToConjunction(l, r) =>
+ // Guard: AND IsNotNull(<nullable operand>) to preserve whole-null
vs
+ // all-null-fields semantics. Without it, a whole-null struct row
would
+ // pass the fieldConjunction when the literal has all-null fields.
+ val nullGuards = (if (l.nullable) Seq(IsNotNull(l)) else Nil) ++
+ (if (r.nullable) Seq(IsNotNull(r)) else Nil)
+ val conjunction = fieldConjunction(l, r)
+ if (nullGuards.isEmpty) conjunction
+ else (nullGuards :+ conjunction).reduce(And)
+ case EqualNullSafe(l, r) if canDecompose(l, r) =>
Review Comment:
Nit: These two arms (the `EqualTo` case above and this `EqualNullSafe` case)
now have byte-identical bodies -- in Filter position `s = lit` and `s <=> lit`
decompose to the same `IsNotNull-guards AND fieldConjunction`. They could fold
into one:
```scala
case EqualTo(l, r) | EqualNullSafe(l, r)
if canDecompose(l, r) && filterEquivalentToConjunction(l, r) =>
val nullGuards = (if (l.nullable) Seq(IsNotNull(l)) else Nil) ++
(if (r.nullable) Seq(IsNotNull(r)) else Nil)
val conjunction = fieldConjunction(l, r)
if (nullGuards.isEmpty) conjunction else (nullGuards :+
conjunction).reduce(And)
```
Beyond de-duplication, this closes a small asymmetry: the `EqualTo` arm is
structurally guarded by `filterEquivalentToConjunction` (a both-nullable pair
falls through to the null-preserving `decomposeEqualTo`), but this `<=>` arm
has no such guard -- its correctness for a both-nullable pair relies on
`NullPropagation` (ordered before this rule) having already rewritten `<=>
null` to `IsNull`. I verified that holds today (`s <=> CAST(NULL AS
struct<...>)` optimizes to `isnull(s)`, and both-nullable columns are rejected
by `canDecompose`), so this is hardening, not a bugfix -- but routing both arms
through the same guard makes the `<=>` path self-evidently correct without
depending on batch ordering. (A both-nullable `<=>` correctly falls through to
`decomposeEqualNullSafe`, which already handles it.)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]