zikangh opened a new pull request, #56133:
URL: https://github.com/apache/spark/pull/56133
### What changes were proposed in this pull request?
In `MicroBatchExecution.logicalPlan`, before calling `build()` on the V2
streaming scan
builder, call
`SupportsPushDownRequiredColumns.pruneColumns(output.toStructType)` if the
builder supports it. `output` is the analyzed relation output, which already
includes any
metadata columns the query references (added by the `AddMetadataColumns`
rule).
### Why are the changes needed?
1. **Metadata column reads in V2 streaming crash with
`ArrayIndexOutOfBoundsException`.**
When a query selects a metadata column (e.g. `_metadata.row_id`) from a
V2 streaming
source that implements both `SupportsMetadataColumns` and
`SupportsPushDownRequiredColumns`,
the analyzed plan expects the metadata column in the scan output, but
`Scan.readSchema()`
does not include it. Spark tries to read a column at an index the scan
never produced.
2. **Root cause: `pruneColumns` is never called in streaming.**
In batch, `V2ScanRelationPushDown` calls
`SupportsPushDownRequiredColumns.pruneColumns`
with the required schema (which includes metadata columns resolved by
`AddMetadataColumns`)
before `build()`. In `MicroBatchExecution.logicalPlan`, the scan is built
directly with
`table.newScanBuilder(options).build()` — no pushdown of any kind is
applied (a
`// TODO: operator pushdown` comment marks this). Connectors that use
`pruneColumns` to
configure `readSchema()` — including whether to produce metadata columns
— are never
informed of what the query needs.
3. **This change fixes metadata column reads only, not column pruning.**
We call `pruneColumns(output.toStructType)` where `output` is the full
analyzed relation
output — all data columns plus any metadata columns added by
`AddMetadataColumns`. This
communicates required metadata columns to the scan builder so they appear
in `readSchema()`,
but does not prune data columns. Full column pruning in streaming, along
with filter and
aggregate pushdown, is deferred to the existing TODO.
### Does this PR introduce _any_ user-facing change?
Yes. V2 streaming queries that select metadata columns from a source
implementing both
`SupportsMetadataColumns` and `SupportsPushDownRequiredColumns` no longer
fail with
`ArrayIndexOutOfBoundsException`. Streaming scan builders that implement
`SupportsPushDownRequiredColumns` will now receive `pruneColumns` calls with
the projected
schema at query initialization.
### How was this patch tested?
Added a test in `DataStreamTableAPISuite` that wraps the V2 streaming scan
builder with a
recording `SupportsPushDownRequiredColumns` implementation and asserts that
`pruneColumns`
is called with the correct projected schema, including metadata columns.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Sonnet 4.6
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]