jalpan-randeri opened a new pull request, #55679:
URL: https://github.com/apache/spark/pull/55679
### What changes were proposed in this pull request?
This PR introduces support for Predicate Pushdown in Spark Structured
Streaming (DataSource V2).
This allows DSv2 connectors (like Apache Iceberg) to enabling metadata-level
file pruning and reduced I/O for streaming micro-batches.
### Why are the changes needed?
Currently, Spark Structured streaming via the DSv2 api does not pushdown
predicate. This results in more data being scan and filtered out at engine
layer. This results in excessive I/O, driver bottlenecks and increased latency.
### Does this PR introduce _any_ user-facing change?
No There is no change to the user-facing API
This change improves the performance in the presence of filter at partition
level.
### How was this patch tested?
- Added PushDownPredicateInMicroBatchExecutionSuite tests
- Manual Testing
```
scala> val streamingDF = spark.readStream
| .format("iceberg")
| .load("local.db.filtered_stream")
| .filter("part = 'active'") // This filter is currently NOT pushed
down to Iceberg
|
| val query = streamingDF.writeStream
| .format("console")
| .option("checkpointLocation",
s"/tmp/iceberg_stream_${System.currentTimeMillis()}")
| .start()
val streamingDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] =
[id: bigint, part: string]
val query: org.apache.spark.sql.streaming.StreamingQuery =
org.apache.spark.sql.execution.streaming.runtime.StreamingQueryWrapper@577c2d74
scala> 26/05/03 20:32:57 ERROR SparkScan: [jalpan] creating micro batch
stream
26/05/03 20:32:57 ERROR SparkMicroBatchStream: [jalpan] creating micro batch
with filter [ref(name="part") == "active"]
26/05/03 20:32:58 ERROR SparkMicroBatchStream: [jalpan] planning input
partitions micro batch with filter [ref(name="part") == "active"]
26/05/03 20:32:58 ERROR SparkMicroBatchStream: [jalpan] planning input
partitions micro batch with filter [ref(name="part") == "active"]
26/05/03 20:32:58 ERROR SparkMicroBatchStream: [jalpan] planning input
partitions micro batch with filter [ref(name="part") == "active"]
26/05/03 20:32:58 ERROR SparkMicroBatchStream: [jalpan] planning input
partitions micro batch with filter [ref(name="part") == "active"]
26/05/03 20:32:59 ERROR SparkMicroBatchStream: [jalpan] planning input
partitions micro batch with filter [ref(name="part") == "active"]
26/05/03 20:32:59 ERROR SparkMicroBatchStream: [jalpan] planning input
partitions micro batch with filter [ref(name="part") == "active"]
-------------------------------------------
Batch: 0
-------------------------------------------
+---+------+
| id| part|
+---+------+
| 1|active|
+---+------+
```
### Was this patch authored or co-authored using generative AI tooling?
No
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]