zikangh opened a new pull request, #56133:
URL: https://github.com/apache/spark/pull/56133

   ### What changes were proposed in this pull request?
   
   In `MicroBatchExecution.logicalPlan`, before calling `build()` on the V2 
streaming scan
   builder, call 
`SupportsPushDownRequiredColumns.pruneColumns(output.toStructType)` if the
   builder supports it. `output` is the analyzed relation output, which already 
includes any
   metadata columns the query references (added by the `AddMetadataColumns` 
rule).
   
   ### Why are the changes needed?
   
   1. **Metadata column reads in V2 streaming crash with 
`ArrayIndexOutOfBoundsException`.**
      When a query selects a metadata column (e.g. `_metadata.row_id`) from a 
V2 streaming
      source that implements both `SupportsMetadataColumns` and 
`SupportsPushDownRequiredColumns`,
      the analyzed plan expects the metadata column in the scan output, but 
`Scan.readSchema()`
      does not include it. Spark tries to read a column at an index the scan 
never produced.
   
   2. **Root cause: `pruneColumns` is never called in streaming.**
      In batch, `V2ScanRelationPushDown` calls 
`SupportsPushDownRequiredColumns.pruneColumns`
      with the required schema (which includes metadata columns resolved by 
`AddMetadataColumns`)
      before `build()`. In `MicroBatchExecution.logicalPlan`, the scan is built 
directly with
      `table.newScanBuilder(options).build()` — no pushdown of any kind is 
applied (a
      `// TODO: operator pushdown` comment marks this). Connectors that use 
`pruneColumns` to
      configure `readSchema()` — including whether to produce metadata columns 
— are never
      informed of what the query needs.
   
   3. **This change fixes metadata column reads only, not column pruning.**
      We call `pruneColumns(output.toStructType)` where `output` is the full 
analyzed relation
      output — all data columns plus any metadata columns added by 
`AddMetadataColumns`. This
      communicates required metadata columns to the scan builder so they appear 
in `readSchema()`,
      but does not prune data columns. Full column pruning in streaming, along 
with filter and
      aggregate pushdown, is deferred to the existing TODO.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes. V2 streaming queries that select metadata columns from a source 
implementing both
   `SupportsMetadataColumns` and `SupportsPushDownRequiredColumns` no longer 
fail with
   `ArrayIndexOutOfBoundsException`. Streaming scan builders that implement
   `SupportsPushDownRequiredColumns` will now receive `pruneColumns` calls with 
the projected
   schema at query initialization.
   
   ### How was this patch tested?
   
   Added a test in `DataStreamTableAPISuite` that wraps the V2 streaming scan 
builder with a
   recording `SupportsPushDownRequiredColumns` implementation and asserts that 
`pruneColumns`
   is called with the correct projected schema, including metadata columns.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   Generated-by: Claude Sonnet 4.6


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to