LuciferYang commented on PR #56264:
URL: https://github.com/apache/spark/pull/56264#issuecomment-4627713124

   @peter-toth Sorry for missing PR #37711 earlier, and many thanks for your 
review and suggestions. . You're right that with the body on `FileScan`, only 
the built-in file formats get parity, and that in-tree `JDBCScan` plus every 
external connector would each have to re-implement `mergeWith`. Generalizing 
"who pays the cost" the way Option B does is clearly the better long-term 
direction, and the `hasMergeBlockingPushdown` idea is something I'd want 
regardless of which shape we land on. It collapses the format-specific hooks 
the PR currently carries -- `hasAggregatePushedDown` and Parquet's 
`canMergeScanStateWith` / `pushedVariantExtractions` check -- into a single 
tracked flag, and it's independently useful for explain output and the 
constraint-propagation TODO already sitting on `DataSourceV2ScanRelation`.
   
   One thing worth pinning down in the Option B sketch: the holder's 
`pushedFilterExpressions` isn't the pre-existing 
`SupportsPushDownFilters.pushedFilters()` (the inclusive set, filters pushed 
whether or not they're also kept post-scan). It's narrower -- only the filters 
that were *fully* pushed and removed from the post-scan set. The field on the 
relation is `pushedFilters`, mapped from the holder's `pushedFilterExpressions` 
via `remappedPushedFilters`. In `V2ScanRelationPushDown.pushDownFilters` it's 
computed as 
`normalizedFiltersWithoutSubquery.filterNot(postScanFilterSet.contains).filter(_.deterministic)`,
 and the comment in that same method notes that pushed and post-scan filters 
can overlap ("e.g. the parquet row group filter"). For best-effort file sources 
that's exactly what happens: the `WHERE` predicate is pushed for row-group 
pruning *and* kept as a post-scan `Filter`, so it lands in the post-scan set 
and is excluded from `pushedFilterExpressions`. The relaxed OR-widenin
 g in this PR widens `FileScan`'s `dataFilters`, which are precisely those 
best-effort filters that never land in `pushedFilterExpressions`.
   
   A merge driven off the relation's `pushedFilters` therefore can't see the 
filters it would need to widen, so it can't reproduce the relaxed path that 
gives the q9-style win -- the OR-of-`WHERE`-clauses case where widening lets 
two scans share one pass. There's a sharper version of this: when two file 
scans differ only in `WHERE`, both of their `pushedFilters` are empty, because 
both predicates went to the post-scan side. A generic strict merge keyed on 
equal `pushedFilters` would then fold them into one full-table scan. Results 
stay correct -- the two `WHERE` residuals still sit above as per-side `Filter`s 
-- but each side loses its row-group pruning, and nothing on the relation 
distinguishes "genuinely no filter" from "best-effort filter pushed elsewhere." 
That's the distinction A uses to decline this case by default unless 
`ignorePushedDataFilters` is set. To get the relaxed behavior back, B would 
have to thread the post-scan `Filter` conditions down into the leaf merge and 
re-t
 ranslate and re-push them, which is essentially re-doing what 
`FileScan.mergeWith` already does.
   
   The strict column-union case (same pushed filters, differ only in projected 
columns) is universally safe and is the only on-by-default path. That's where B 
helps most, since it generalizes to every V2 source with no per-connector code. 
The relaxed OR-widen only makes sense for sources that keep a post-scan 
residual `Filter` -- best-effort pushdown, in practice file sources -- which is 
a property of where residual filters live, not a limitation of the FileScan 
approach. For an exact-pushdown source like JDBC, two scans differing in 
`WHERE` have *different* `pushedFilters`, so the gate 
(`ExpressionSet(np.pushedFilters) != ExpressionSet(cp.pushedFilters)`) declines 
them; declining is correct there, because with no Filter above there is nothing 
to re-apply per side.
   
   There's also a layering issue that affects where the generic body can live. 
`PlanMerger` and `MergeSubplans` are in catalyst, and so is the 
connector-filter translation (`V2ExpressionBuilder` / `PushableExpression`), so 
the `SupportsPushDownV2Filters` path (JDBC and friends) would be reachable from 
catalyst -- but that part alone wouldn't force a move. What pins a generic body 
to sql/core is the legacy `SupportsPushDownFilters` translation 
(`DataSourceStrategy.translateFilter`) and the `PushDownUtils.pushFilters` 
orchestration, which both live there. So the body belongs in sql/core, invoked 
from the V2 leaf in `PlanMerger`: "Spark once, not per-connector," but not zero 
code -- a marker interface still bridges the boundary, keyed on the standard 
`SupportsPushDown*` contracts rather than on the connector.
   
   I'd propose we land A first (complete, tested, ships the on-by-default 
strict column-union case), then generalize in a follow-up that (1) adds a 
generic Spark-side strict merge in sql/core behind the marker interface, (2) 
adds the `hasMergeBlockingPushdown` flag, and (3) keeps the relaxed widening 
connector-side unless we can make the post-scan residual visible to the leaf 
merge.
   
   Either way, yes please -- I'd like to take you up on the offer to sketch B. 
Seeing the two side by side would make the strict-vs-relaxed boundary concrete 
and show whether our follow-ups converge. If you can wire B so the post-scan 
residual is visible to the leaf merge, that dissolves my main concern -- I may 
well be missing a simpler path. cc @cloud-fan
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to