peter-toth commented on PR #56264:
URL: https://github.com/apache/spark/pull/56264#issuecomment-4623467446

   Thanks for picking this up @LuciferYang. I haven't done a thorough review of 
the PR yet, but I read it through carefully alongside my old #37711 and wanted 
to share a thought on the approach before getting into the details.
   
   The shape here is essentially the same as #37711 (which never landed), 
generalized in two important ways: the merge logic now lives once on `FileScan` 
instead of per-format, and you've added OR-widening of pushed data filters 
under `spark.sql.files.scanMerge.ignorePushedDataFilters`.
   
   One thought on the design direction, and an alternative worth at least 
considering before we commit to this shape:
   
   **The current shape (call it Option A)**
   
   The merge body lives on the connector side, in `FileScan.mergeWith`. That 
works cleanly for built-in `FileScan` formats — they all live in Spark and 
share one base. I'm not too concerned about file sources for that reason. But:
   
   - Even within Spark, the in-tree V2 JDBC source (`JDBCScan`) is not a 
`FileScan`. To get parity there, someone would have to re-implement `mergeWith` 
against `JDBCScanBuilder` separately.
   - For external V2 connectors — Iceberg, Delta, Hudi, BigQuery, Cassandra, 
in-house connectors — every one of them would have to write their own 
`mergeWith`. In practice they'll either lag this feature or skip it entirely, 
and we end up with V2-parity-with-V1 only for built-in file formats.
   
   **An alternative (Option B)**
   
   [SPARK-56385](https://issues.apache.org/jira/browse/SPARK-56385) (April 
2026) added `pushedFilters: Seq[Expression]` to `DataSourceV2ScanRelation`. The 
motivation there was constraint propagation, but it incidentally gives us most 
of the information needed to do this merge entirely on the Spark side:
   
   1. Small follow-up to SPARK-56385: also record non-filter pushdowns on the 
relation, either as the existing `PushedDownOperators` record (already built 
internally in `V2ScanRelationPushDown.ScanBuilderHolder`, line 1052) or, 
simpler, a single `hasMergeBlockingPushdown: Boolean` set when aggregate / 
limit / offset / topN / sample / join / variant pushdown happened. This is 
independently useful for explain output and future optimizer work, not just 
plan-merge.
   
   2. In `PlanMerger`, do the merge body Spark-side using only standard 
contracts:
      ```scala
      if (np.relation.canonicalized != cp.relation.canonicalized) return None
      if (np.hasMergeBlockingPushdown || cp.hasMergeBlockingPushdown) return 
None
      val builder = 
np.relation.table.asInstanceOf[SupportsRead].newScanBuilder(np.relation.options)
      // OR-widen np.pushedFilters with cp.pushedFilters (strict / relaxed 
split same as this PR)
      // pruneColumns to readSchema().merge(...)
      // build, return as the merged scan
      ```
   
   3. `SupportsScanMerging` collapses to a **zero-method marker interface**. A 
connector opts in by declaring it; Spark provides the merge body using 
`SupportsPushDownFilters` / `SupportsPushDownRequiredColumns`, which the 
connector already implements as part of normal pushdown.
   
   The strict/relaxed split and the `ignorePushedDataFilters` config carry over 
unchanged, so the capability is identical. The format-specific 
`canMergeScanStateWith` / `hasAggregatePushedDown` hooks go away — Parquet's 
`pushedVariantExtractions` is itself a tracked pushdown axis 
(`pushDownVariants`), so it falls under the merge-blocking flag for free.
   
   **Trade-off**
   
   - Option A (this PR): localized; built-in file formats get parity. Each 
non-`FileScan` source — in-tree JDBC and every external connector — pays for 
its own `mergeWith` later.
   - Option B: depends on a small SPARK-56385 follow-up, but every V2 source 
(file or otherwise, in-tree or external) gets parity with no per-source code. 
The connector-side change is one trait declaration.
   
   I don't have a position yet on whether to swap this PR for B or land A first 
and follow up — wanted to put the alternative on the table since it changes who 
pays the cost (Spark once vs. every connector). If you'd like to experiment 
with Option B yourself, please go ahead; if you'd prefer, I can sketch it as a 
draft so we can compare side by side. Let me know which you'd like.
   
   cc @cloud-fan


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to