LuciferYang commented on PR #56264:
URL: https://github.com/apache/spark/pull/56264#issuecomment-4627713124
@peter-toth Sorry for missing PR #37711 earlier, and many thanks for your
review and suggestions. . You're right that with the body on `FileScan`, only
the built-in file formats get parity, and that in-tree `JDBCScan` plus every
external connector would each have to re-implement `mergeWith`. Generalizing
"who pays the cost" the way Option B does is clearly the better long-term
direction, and the `hasMergeBlockingPushdown` idea is something I'd want
regardless of which shape we land on. It collapses the format-specific hooks
the PR currently carries -- `hasAggregatePushedDown` and Parquet's
`canMergeScanStateWith` / `pushedVariantExtractions` check -- into a single
tracked flag, and it's independently useful for explain output and the
constraint-propagation TODO already sitting on `DataSourceV2ScanRelation`.
One thing worth pinning down in the Option B sketch: the holder's
`pushedFilterExpressions` isn't the pre-existing
`SupportsPushDownFilters.pushedFilters()` (the inclusive set, filters pushed
whether or not they're also kept post-scan). It's narrower -- only the filters
that were *fully* pushed and removed from the post-scan set. The field on the
relation is `pushedFilters`, mapped from the holder's `pushedFilterExpressions`
via `remappedPushedFilters`. In `V2ScanRelationPushDown.pushDownFilters` it's
computed as
`normalizedFiltersWithoutSubquery.filterNot(postScanFilterSet.contains).filter(_.deterministic)`,
and the comment in that same method notes that pushed and post-scan filters
can overlap ("e.g. the parquet row group filter"). For best-effort file sources
that's exactly what happens: the `WHERE` predicate is pushed for row-group
pruning *and* kept as a post-scan `Filter`, so it lands in the post-scan set
and is excluded from `pushedFilterExpressions`. The relaxed OR-widenin
g in this PR widens `FileScan`'s `dataFilters`, which are precisely those
best-effort filters that never land in `pushedFilterExpressions`.
A merge driven off the relation's `pushedFilters` therefore can't see the
filters it would need to widen, so it can't reproduce the relaxed path that
gives the q9-style win -- the OR-of-`WHERE`-clauses case where widening lets
two scans share one pass. There's a sharper version of this: when two file
scans differ only in `WHERE`, both of their `pushedFilters` are empty, because
both predicates went to the post-scan side. A generic strict merge keyed on
equal `pushedFilters` would then fold them into one full-table scan. Results
stay correct -- the two `WHERE` residuals still sit above as per-side `Filter`s
-- but each side loses its row-group pruning, and nothing on the relation
distinguishes "genuinely no filter" from "best-effort filter pushed elsewhere."
That's the distinction A uses to decline this case by default unless
`ignorePushedDataFilters` is set. To get the relaxed behavior back, B would
have to thread the post-scan `Filter` conditions down into the leaf merge and
re-t
ranslate and re-push them, which is essentially re-doing what
`FileScan.mergeWith` already does.
The strict column-union case (same pushed filters, differ only in projected
columns) is universally safe and is the only on-by-default path. That's where B
helps most, since it generalizes to every V2 source with no per-connector code.
The relaxed OR-widen only makes sense for sources that keep a post-scan
residual `Filter` -- best-effort pushdown, in practice file sources -- which is
a property of where residual filters live, not a limitation of the FileScan
approach. For an exact-pushdown source like JDBC, two scans differing in
`WHERE` have *different* `pushedFilters`, so the gate
(`ExpressionSet(np.pushedFilters) != ExpressionSet(cp.pushedFilters)`) declines
them; declining is correct there, because with no Filter above there is nothing
to re-apply per side.
There's also a layering issue that affects where the generic body can live.
`PlanMerger` and `MergeSubplans` are in catalyst, and so is the
connector-filter translation (`V2ExpressionBuilder` / `PushableExpression`), so
the `SupportsPushDownV2Filters` path (JDBC and friends) would be reachable from
catalyst -- but that part alone wouldn't force a move. What pins a generic body
to sql/core is the legacy `SupportsPushDownFilters` translation
(`DataSourceStrategy.translateFilter`) and the `PushDownUtils.pushFilters`
orchestration, which both live there. So the body belongs in sql/core, invoked
from the V2 leaf in `PlanMerger`: "Spark once, not per-connector," but not zero
code -- a marker interface still bridges the boundary, keyed on the standard
`SupportsPushDown*` contracts rather than on the connector.
I'd propose we land A first (complete, tested, ships the on-by-default
strict column-union case), then generalize in a follow-up that (1) adds a
generic Spark-side strict merge in sql/core behind the marker interface, (2)
adds the `hasMergeBlockingPushdown` flag, and (3) keeps the relaxed widening
connector-side unless we can make the post-scan residual visible to the leaf
merge.
Either way, yes please -- I'd like to take you up on the offer to sketch B.
Seeing the two side by side would make the strict-vs-relaxed boundary concrete
and show whether our follow-ups converge. If you can wire B so the post-scan
residual is visible to the leaf merge, that dissolves my main concern -- I may
well be missing a simpler path. cc @cloud-fan
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]