gengliangwang commented on code in PR #56133:
URL: https://github.com/apache/spark/pull/56133#discussion_r3308559810


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala:
##########
@@ -224,7 +225,13 @@ class MicroBatchExecution(
               log"from DataSourceV2 named 
'${MDC(LogKeys.STREAMING_DATA_SOURCE_NAME, srcName)}' " +
               log"${MDC(LogKeys.STREAMING_DATA_SOURCE_DESCRIPTION, dsStr)}")
             // TODO: operator pushdown.
-            val scan = table.newScanBuilder(options).build()
+            val scanBuilder = table.newScanBuilder(options)
+            scanBuilder match {
+              case r: SupportsPushDownRequiredColumns =>
+                r.pruneColumns(output.toStructType)

Review Comment:
   `ContinuousExecution.logicalPlan` 
(sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala:95)
 has the identical pattern — `table.newScanBuilder(options).build()` with the 
same `// TODO: operator pushdown` comment. A query that selects a metadata 
column from a V2 continuous source implementing both `SupportsMetadataColumns` 
and `SupportsPushDownRequiredColumns` should hit the same 
`ArrayIndexOutOfBoundsException`. Either apply this fix symmetrically there, or 
note in the PR description why microbatch-only is sufficient.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala:
##########
@@ -224,7 +225,13 @@ class MicroBatchExecution(
               log"from DataSourceV2 named 
'${MDC(LogKeys.STREAMING_DATA_SOURCE_NAME, srcName)}' " +
               log"${MDC(LogKeys.STREAMING_DATA_SOURCE_DESCRIPTION, dsStr)}")
             // TODO: operator pushdown.
-            val scan = table.newScanBuilder(options).build()
+            val scanBuilder = table.newScanBuilder(options)
+            scanBuilder match {
+              case r: SupportsPushDownRequiredColumns =>
+                r.pruneColumns(output.toStructType)
+              case _ =>
+            }
+            val scan = scanBuilder.build()

Review Comment:
   The batch analogue in `PushDownUtils.pruneColumns` rebinds the relation's 
output from `scan.readSchema()` after `build()` (via 
`toOutputAttrs(scan.readSchema(), relation)`) so the relation reflects what the 
scan actually produces. Here we keep the analyzed `output` and trust the 
connector to produce a matching `readSchema()`. If a connector reorders fields 
or silently drops an unrecognized column, downstream binding will fail with a 
different cryptic error that looks like the bug this PR fixes — making future 
regressions confusing to diagnose. Consider either adopting the batch defensive 
rebind, or adding a short comment explaining why the analyzed `output` is safe 
to trust here.



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala:
##########
@@ -564,6 +564,40 @@ class DataStreamTableAPISuite extends StreamTest with 
BeforeAndAfter {
     }
   }
 
+  test("pruneColumns called on SupportsPushDownRequiredColumns V2 streaming 
scan builder") {
+    val tblName = "teststream.table_name"
+    withTable(tblName) {
+      spark.sql(s"CREATE TABLE $tblName (data int) USING foo")
+      val stream = MemoryStream[Int]
+      val testCatalog = 
spark.sessionState.catalogManager.catalog("teststream").asTableCatalog
+      val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
+        .asInstanceOf[InMemoryStreamTable]
+      table.setStream(stream)
+
+      // Wrap the table's scan builder so we can record pruneColumns calls.
+      val recorded = new PrunedSchemaRecorder
+      table.scanBuilderWrapper = Some(inner => new 
RecordingPruneScanBuilder(inner, recorded))
+
+      withTempDir { checkpointDir =>
+        val q = spark.readStream.table(tblName)
+          .select("value", "_seq")
+          .writeStream.format("noop")
+          .option("checkpointLocation", checkpointDir.getCanonicalPath)
+          .start()
+        try {
+          stream.addData(1, 2, 3)
+          q.processAllAvailable()
+          assert(recorded.called, "pruneColumns should have been called on the 
streaming scan builder")

Review Comment:
   This test asserts the `pruneColumns` API call happens, but doesn't actually 
exercise the user-visible bug. `RecordingPruneScanBuilder.build()` returns the 
unmodified inner `MemoryStreamScanBuilder` whose `readSchema()` is 
`stream.fullSchema() = {value}` — `_seq` is never produced by the scan 
regardless of what `pruneColumns` received. So the assertion passes even though 
the original `ArrayIndexOutOfBoundsException` scenario isn't reproduced here. 
The test protects against regression of the API call but not the failure mode 
described in the PR.
   
   Consider adding a second test that uses a connector whose `readSchema()` 
actually depends on the prune input (conditionally exposing `_seq` when 
requested) and asserts via a memory sink (not `noop`) that selecting `_seq` 
returns expected values end-to-end. That would protect against connector-side 
breakage and document the contract the fix relies on.



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala:
##########
@@ -564,6 +564,40 @@ class DataStreamTableAPISuite extends StreamTest with 
BeforeAndAfter {
     }
   }
 
+  test("pruneColumns called on SupportsPushDownRequiredColumns V2 streaming 
scan builder") {

Review Comment:
   Minor: adjacent tests use a `SPARK-NNNNN:` prefix (e.g. line 553 
`SPARK-44865:`). Suggest matching that convention.
   ```suggestion
     test("SPARK-56132: pruneColumns called on SupportsPushDownRequiredColumns 
V2 streaming scan builder") {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to