jalpan-randeri opened a new pull request, #55679:
URL: https://github.com/apache/spark/pull/55679

   ### What changes were proposed in this pull request?
   This PR introduces support for Predicate Pushdown in Spark Structured 
Streaming (DataSource V2).
   
   This allows DSv2 connectors (like Apache Iceberg) to enabling metadata-level 
file pruning and reduced I/O for streaming micro-batches.
   
   ### Why are the changes needed?
   
   Currently, Spark Structured streaming via the DSv2 api does not pushdown 
predicate. This results in more data being scan and filtered out at engine 
layer. This results in excessive I/O, driver bottlenecks and increased latency.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No There is no change to the user-facing API
   This change improves the performance in the presence of filter at partition 
level.
   
   ### How was this patch tested?
   - Added PushDownPredicateInMicroBatchExecutionSuite  tests
   - Manual Testing
   
   ```
   scala> val streamingDF = spark.readStream
        |   .format("iceberg")
        |   .load("local.db.filtered_stream")
        |   .filter("part = 'active'") // This filter is currently NOT pushed 
down to Iceberg
        |
        | val query = streamingDF.writeStream
        |   .format("console")
        |   .option("checkpointLocation", 
s"/tmp/iceberg_stream_${System.currentTimeMillis()}")
        |   .start()
   
   val streamingDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = 
[id: bigint, part: string]
   val query: org.apache.spark.sql.streaming.StreamingQuery = 
org.apache.spark.sql.execution.streaming.runtime.StreamingQueryWrapper@577c2d74
   
   scala> 26/05/03 20:32:57 ERROR SparkScan: [jalpan] creating micro batch 
stream
   26/05/03 20:32:57 ERROR SparkMicroBatchStream: [jalpan] creating micro batch 
with filter [ref(name="part") == "active"]
   26/05/03 20:32:58 ERROR SparkMicroBatchStream: [jalpan] planning input 
partitions micro batch with filter [ref(name="part") == "active"]
   26/05/03 20:32:58 ERROR SparkMicroBatchStream: [jalpan] planning input 
partitions micro batch with filter [ref(name="part") == "active"]
   26/05/03 20:32:58 ERROR SparkMicroBatchStream: [jalpan] planning input 
partitions micro batch with filter [ref(name="part") == "active"]
   26/05/03 20:32:58 ERROR SparkMicroBatchStream: [jalpan] planning input 
partitions micro batch with filter [ref(name="part") == "active"]
   26/05/03 20:32:59 ERROR SparkMicroBatchStream: [jalpan] planning input 
partitions micro batch with filter [ref(name="part") == "active"]
   26/05/03 20:32:59 ERROR SparkMicroBatchStream: [jalpan] planning input 
partitions micro batch with filter [ref(name="part") == "active"]
   -------------------------------------------
   Batch: 0
   -------------------------------------------
   +---+------+
   | id|  part|
   +---+------+
   |  1|active|
   +---+------+
   ```
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to