gengliangwang commented on code in PR #56133:
URL: https://github.com/apache/spark/pull/56133#discussion_r3308559810
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala:
##########
@@ -224,7 +225,13 @@ class MicroBatchExecution(
log"from DataSourceV2 named
'${MDC(LogKeys.STREAMING_DATA_SOURCE_NAME, srcName)}' " +
log"${MDC(LogKeys.STREAMING_DATA_SOURCE_DESCRIPTION, dsStr)}")
// TODO: operator pushdown.
- val scan = table.newScanBuilder(options).build()
+ val scanBuilder = table.newScanBuilder(options)
+ scanBuilder match {
+ case r: SupportsPushDownRequiredColumns =>
+ r.pruneColumns(output.toStructType)
Review Comment:
`ContinuousExecution.logicalPlan`
(sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala:95)
has the identical pattern — `table.newScanBuilder(options).build()` with the
same `// TODO: operator pushdown` comment. A query that selects a metadata
column from a V2 continuous source implementing both `SupportsMetadataColumns`
and `SupportsPushDownRequiredColumns` should hit the same
`ArrayIndexOutOfBoundsException`. Either apply this fix symmetrically there, or
note in the PR description why microbatch-only is sufficient.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala:
##########
@@ -224,7 +225,13 @@ class MicroBatchExecution(
log"from DataSourceV2 named
'${MDC(LogKeys.STREAMING_DATA_SOURCE_NAME, srcName)}' " +
log"${MDC(LogKeys.STREAMING_DATA_SOURCE_DESCRIPTION, dsStr)}")
// TODO: operator pushdown.
- val scan = table.newScanBuilder(options).build()
+ val scanBuilder = table.newScanBuilder(options)
+ scanBuilder match {
+ case r: SupportsPushDownRequiredColumns =>
+ r.pruneColumns(output.toStructType)
+ case _ =>
+ }
+ val scan = scanBuilder.build()
Review Comment:
The batch analogue in `PushDownUtils.pruneColumns` rebinds the relation's
output from `scan.readSchema()` after `build()` (via
`toOutputAttrs(scan.readSchema(), relation)`) so the relation reflects what the
scan actually produces. Here we keep the analyzed `output` and trust the
connector to produce a matching `readSchema()`. If a connector reorders fields
or silently drops an unrecognized column, downstream binding will fail with a
different cryptic error that looks like the bug this PR fixes — making future
regressions confusing to diagnose. Consider either adopting the batch defensive
rebind, or adding a short comment explaining why the analyzed `output` is safe
to trust here.
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala:
##########
@@ -564,6 +564,40 @@ class DataStreamTableAPISuite extends StreamTest with
BeforeAndAfter {
}
}
+ test("pruneColumns called on SupportsPushDownRequiredColumns V2 streaming
scan builder") {
+ val tblName = "teststream.table_name"
+ withTable(tblName) {
+ spark.sql(s"CREATE TABLE $tblName (data int) USING foo")
+ val stream = MemoryStream[Int]
+ val testCatalog =
spark.sessionState.catalogManager.catalog("teststream").asTableCatalog
+ val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
+ .asInstanceOf[InMemoryStreamTable]
+ table.setStream(stream)
+
+ // Wrap the table's scan builder so we can record pruneColumns calls.
+ val recorded = new PrunedSchemaRecorder
+ table.scanBuilderWrapper = Some(inner => new
RecordingPruneScanBuilder(inner, recorded))
+
+ withTempDir { checkpointDir =>
+ val q = spark.readStream.table(tblName)
+ .select("value", "_seq")
+ .writeStream.format("noop")
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .start()
+ try {
+ stream.addData(1, 2, 3)
+ q.processAllAvailable()
+ assert(recorded.called, "pruneColumns should have been called on the
streaming scan builder")
Review Comment:
This test asserts the `pruneColumns` API call happens, but doesn't actually
exercise the user-visible bug. `RecordingPruneScanBuilder.build()` returns the
unmodified inner `MemoryStreamScanBuilder` whose `readSchema()` is
`stream.fullSchema() = {value}` — `_seq` is never produced by the scan
regardless of what `pruneColumns` received. So the assertion passes even though
the original `ArrayIndexOutOfBoundsException` scenario isn't reproduced here.
The test protects against regression of the API call but not the failure mode
described in the PR.
Consider adding a second test that uses a connector whose `readSchema()`
actually depends on the prune input (conditionally exposing `_seq` when
requested) and asserts via a memory sink (not `noop`) that selecting `_seq`
returns expected values end-to-end. That would protect against connector-side
breakage and document the contract the fix relies on.
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala:
##########
@@ -564,6 +564,40 @@ class DataStreamTableAPISuite extends StreamTest with
BeforeAndAfter {
}
}
+ test("pruneColumns called on SupportsPushDownRequiredColumns V2 streaming
scan builder") {
Review Comment:
Minor: adjacent tests use a `SPARK-NNNNN:` prefix (e.g. line 553
`SPARK-44865:`). Suggest matching that convention.
```suggestion
test("SPARK-56132: pruneColumns called on SupportsPushDownRequiredColumns
V2 streaming scan builder") {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]