Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]

2025-06-25 Thread via GitHub


adriangb commented on PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#issuecomment-3006801288

   Great I'll address 
https://github.com/apache/datafusion/pull/16461#discussion_r2159713329 and then 
I think this will be ready to merge!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]

2025-06-25 Thread via GitHub


kosiew commented on PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#issuecomment-3006767151

   @adriangb 
   
   > @kosiew any objections to merging this?
   
   Nope.
   I am excited to see the solution of the puzzle.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]

2025-06-25 Thread via GitHub


adriangb commented on PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#issuecomment-3006603280

   @kosiew any objections to merging this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]

2025-06-25 Thread via GitHub


adriangb commented on PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#issuecomment-2998806438

   https://github.com/apache/datafusion/pull/16530 will be able to easily be 
incorporated into this work, completely eliminating what are currently 
expensive casts


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]

2025-06-25 Thread via GitHub


adriangb commented on code in PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#discussion_r2162856077


##
datafusion/physical-expr/src/schema_rewriter.rs:
##
@@ -0,0 +1,318 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Physical expression schema rewriting utilities
+
+use std::sync::Arc;
+
+use arrow::compute::can_cast_types;
+use arrow::datatypes::{FieldRef, Schema};
+use datafusion_common::{
+exec_err,
+tree_node::{Transformed, TransformedResult, TreeNode},
+Result, ScalarValue,
+};
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+
+use crate::expressions::{self, CastExpr, Column};
+
+/// Builder for rewriting physical expressions to match different schemas.
+///
+/// # Example
+///
+/// ```rust
+/// use datafusion_physical_expr::schema_rewriter::PhysicalExprSchemaRewriter;
+/// use arrow::datatypes::Schema;
+///
+/// # fn example(
+/// # predicate: std::sync::Arc,
+/// # physical_file_schema: &Schema,
+/// # logical_file_schema: &Schema,
+/// # ) -> datafusion_common::Result<()> {
+/// let rewriter = PhysicalExprSchemaRewriter::new(physical_file_schema, 
logical_file_schema);
+/// let adapted_predicate = rewriter.rewrite(predicate)?;
+/// # Ok(())
+/// # }
+/// ```
+pub struct PhysicalExprSchemaRewriter<'a> {
+physical_file_schema: &'a Schema,
+logical_file_schema: &'a Schema,
+partition_fields: Vec,
+partition_values: Vec,
+}
+
+impl<'a> PhysicalExprSchemaRewriter<'a> {
+/// Create a new schema rewriter with the given schemas
+pub fn new(
+physical_file_schema: &'a Schema,
+logical_file_schema: &'a Schema,
+) -> Self {
+Self {
+physical_file_schema,
+logical_file_schema,
+partition_fields: Vec::new(),
+partition_values: Vec::new(),
+}
+}
+
+/// Add partition columns and their corresponding values
+///
+/// When a column reference matches a partition field, it will be replaced
+/// with the corresponding literal value from partition_values.
+pub fn with_partition_columns(
+mut self,
+partition_fields: Vec,
+partition_values: Vec,
+) -> Self {
+self.partition_fields = partition_fields;
+self.partition_values = partition_values;
+self
+}
+
+/// Rewrite the given physical expression to match the target schema
+///
+/// This method applies the following transformations:
+/// 1. Replaces partition column references with literal values
+/// 2. Handles missing columns by inserting null literals
+/// 3. Casts columns when logical and physical schemas have different types
+pub fn rewrite(&self, expr: Arc) -> Result> {
+expr.transform(|expr| self.rewrite_expr(expr)).data()
+}
+
+fn rewrite_expr(
+&self,
+expr: Arc,
+) -> Result>> {
+if let Some(column) = expr.as_any().downcast_ref::() {
+return self.rewrite_column(Arc::clone(&expr), column);
+}
+
+Ok(Transformed::no(expr))
+}
+
+fn rewrite_column(
+&self,
+expr: Arc,
+column: &Column,
+) -> Result>> {
+// Get the logical field for this column
+let logical_field = match 
self.logical_file_schema.field_with_name(column.name())
+{
+Ok(field) => field,
+Err(e) => {
+// If the column is a partition field, we can use the 
partition value
+if let Some(partition_value) = 
self.get_partition_value(column.name()) {
+return 
Ok(Transformed::yes(expressions::lit(partition_value)));
+}
+// If the column is not found in the logical schema and is not 
a partition value, return an error
+// This should probably never be hit unless something upstream 
broke, but nontheless it's better
+// for us to return a handleable error than to panic / do 
something unexpected.
+return Err(e.into());
+}
+};
+
+// Check if the column exists in the physical schema
+let physical_column_index =
+match self.physica

Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]

2025-06-25 Thread via GitHub


adriangb commented on PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#issuecomment-3004630417

   Thank you very much for the feedback @kosiew 🙏🏻! I don't mean to disregard 
it, you make great points, but I think they are surmountable! Let's move 
forward with this and keep iterating on the approaches in parallel. If it turns 
out that this approach won't work for projection evaluation, it's still clearly 
a win for predicate evaluation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]

2025-06-25 Thread via GitHub


adriangb commented on PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#issuecomment-3004580959

   > Every invocation in the evaluator will loop over rows to build child 
arrays, then pack them into a StructArray
   
   As far as I know a PhysicalExpr can operate at the array level. For example 
`lit(ScalarValue::Null). into_array(N)` will end up calling `new_null_array` as 
well after a a couple function call hops: 
   
   
https://github.com/apache/datafusion/blob/e3d3302161d382b9219c4536ad5ec0ce93690ba8/datafusion/common/src/scalar/mod.rs#L2672
   
   I think 3-4 function call hops would be an issue if it did happen for every 
row but it's happening at the array level - it's going to be inconsequential 
compared to the IO happening, etc.
   
   > There’s no built-in “struct constructor” expression in DataFusion
   
   Isn't there 
https://datafusion.apache.org/user-guide/sql/scalar_functions.html#struct? I'm 
sure we can call that ourselves without SQL: 
   
   
https://github.com/apache/datafusion/blob/e3d3302161d382b9219c4536ad5ec0ce93690ba8/datafusion/functions/src/core/struct.rs#L53-L71


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]

2025-06-24 Thread via GitHub


kosiew commented on PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#issuecomment-3003420880

   hi @adriangb 
   
   > could you take a look at 
https://github.com/apache/datafusion/commit/32725dd621ec5e96caf1970433f3549dca977a80?
   
   👍👍👍
The new tests in  
   `PhysicalExprSchemaRewriter`’s suite—including your “`test_adapt_batches`” 
example—*do* demonstrate that we can:
   
   1. **Rewrite** a projection (or filter) against a physical `RecordBatch`,
   
   2. **Evaluate** those rewritten `PhysicalExpr`s on the old batch to
   
   3. **Produce** a brand-new `RecordBatch` that (a) injects nulls for missing 
top-level columns, (b) applies casts, and (c) drops extra columns—all in one go.
   
   So yes, for **flat** schemas and simple projections/filters, we can entirely 
sidestep a separate `map_batch` / `cast_struct_column` step by:
   
   - Generating an expression per target column (either `col(...)` if present 
or `lit(NULL)` if absent),
   - Letting the engine “evaluate” those expressions into new arrays, and
   - Bundling them into a fresh `RecordBatch`.
   
   ✅ **Where the rewrite-only approach shines**
   
   - **Simplicity** for top-level columns. We only need `col` + `lit(NULL)` + 
`CastExpr`.
   - **Unified code path**: predicates *and* projections both go through the 
same rewriter + evaluator.
   - **Less bespoke iterator logic**: no custom `StructArray` walks, no 
recursive field-matching loops.
   
   ---
   
   ⚠️ **Where the schema-adapter approach still wins**
   
   1. **Deeply nested structs**
   
  - There’s *no* built-in “struct constructor” expression in DataFusion’s 
evaluator that I know of
Our rewrite + `batch_project` hack only handles top-level arrays. We 
can’t easily say  
*“build a `StructArray` whose fields are (`col("a.b")`, `lit(NULL)`, 
`cast(col("a.c")`, …))”* purely in expression form
   
   2. **Performance**
   
  - Expression evaluation involves building `ArrayRef`s by walking millions 
of rows through the `PhysicalExpr` vtable.
  - The adapter’s `cast_struct_column` does one recursive scan through each 
`StructArray`'s memory, which is far more cache-friendly for bulk columnar 
operations.
   
   3. **Full schema fidelity**
   
  - The rewrite test only demonstrates:
- *Drop* “extra” columns,
- *Inject null* for missing *top-level* columns,
- *Cast* primitive types.
   
  - It doesn’t cover:
   
- **Adding** a new nested struct (we’d need to build that `StructArray` 
via expressions we don’t have),
- **Recursively** updating sub-children,
- **Preserving** null bit-maps across nested levels.
   
   ```
   > Complex handling for deeply nested types.
   I do think this is a concern, I'm not sure how hard it would be to actually 
implement, but it's theoretically very possible
   ```
   
   ### Why it’s possible but a lot of work (and a performance risk)
   
   1. **Engineering effort**
   
  - We’ll be growing the rewriter from a column/cast replacer into a full 
recursive schema-walker + struct-node constructor.
   
  - We’ll have to handle every corner case: non-nullable nested fields, 
mixed present+missing children, ordering of fields, metadata, etc.
   
   2. **Runtime performance**
   
  - Every invocation in the evaluator will loop over *rows* to build child 
arrays, then pack them into a `StructArray`.
   
  - That’s orders of magnitude slower than a tight `cast_struct_column` 
implementation that does one bulk pass through the existing `StructArray` 
buffers.
   
   
   I hope I don't sound like I am dead against the rewrite approach.
   It is more like you have shown me a puzzle that I don't know how to solve.
   
   ### What I would love to hear
   
   Here's a simpler and faster approach .
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]

2025-06-24 Thread via GitHub


adriangb commented on PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#issuecomment-3000255690

   @kosiew I'm not sure I agree with the conclusions there. Why can't we use 
expressions to do the schema adapting during the scan? It's very possible as 
@alamb pointed out in 
https://github.com/apache/datafusion/pull/16461#issuecomment-2997870791 to feed 
a RecordBatch into a an expression and get back a new array. So unless I'm 
missing something I don't think these are correct:
   
   > Expression rewriting is great for pushdown but batch-level adapters are 
needed for correct, shaped data.
   > No effect on RecordBatch structure.
   > Limited scope (only predicates and pruning).
   
   > Possibly poorer performance due to repeated expression rewrites.
   There's no more expression rewrites than there are SchemaAdapters created. 
Those aren't cached either and are created for each file.
   
   I'll put together an example to show how predicate rewrites can be used to 
reshape data. But also FWIW that's exactly how ProjectionExec works.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]

2025-06-24 Thread via GitHub


adriangb commented on PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#issuecomment-3000698443

   @kosiew could you take a look at 32725dd?
   
   > Complex handling for deeply nested types.
   
   I do think this is a concern, I'm not sure how hard it would be to actually 
implement, but it's theoretically very possible and I think we should be able 
to make it easy to implement with some elbow grease / the right helpers and 
abstractions.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]

2025-06-24 Thread via GitHub


kosiew commented on PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#issuecomment-3000612368

   @adriangb ,
   Sorry, it was not my intention to presume the conclusions.
   
   I do look forward to a solution that handles schema adaptation in one pass.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]

2025-06-24 Thread via GitHub


kosiew commented on PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#issuecomment-2999138805

   Adding notes for future reference:
   
   ---
   
   # Summary: Adapting Filter Expressions to File Schema During Parquet Scan
   
   ---
   
   ## Background & Goal
   
   - Apache DataFusion wants to improve how filter expressions (predicates) and 
projections are adapted to the **physical file schema** during Parquet scans.
   - The effort aims to:
 - Move closer to handling nested struct schema evolution (#15780).
 - Replace older `SchemaAdapter` machinery with a new builder-based 
approach.
 - Support expression rewrites for projection and selection pushdown 
(#14993, #15057).
 - Make it easier to work with schema evolution on nested structs (#15821).
 - Enable simpler hooks for handling missing columns and expression 
transformations.
   
   ---
   
   ## Key Concepts
   
   ### 1. Expression Rewriting (Pushdown Adaptation)
   
   - Rewrites filter and projection expressions to align with the **file’s 
physical schema**.
   - Examples:
 - If an expression refers to a nested field `foo.baz` that is missing on 
disk → rewrite to `lit(NULL)`.
 - If a field has different physical type on disk vs. logical schema → add 
casts.
   - This rewriting ensures that predicate pushdown logic and filters do not 
error out when the *on-disk* schema differs from the *logical* schema.
   - Expression rewriting happens **before** reading data and uses the physical 
schema to safely prune row groups.
   
   ### 2. Data Adaptation (Batch-Level Reshaping)
   
   - After reading a `RecordBatch` or arrays from Parquet, reshape them to 
match the **logical table schema**.
   - Actions include:
 - Adding null arrays for missing nested fields (nested struct imputation).
 - Dropping columns no longer part of the logical schema.
 - Recursively casting nested struct types to match the logical type.
   - This ensures downstream operators receive data shaped exactly as expected 
in the query, despite schema evolution.
   
   ---
   
   ## Main Discussion Points
   
   | Topic  | Details   

|
   | -- | 
-
 |
   | **Proposed Approach**  | Introduce a 
`PhysicalExprSchemaRewriter` builder to adapt expressions to file schema during 
pruning/scanning. |
   | **Nested Struct Imputation**   | 
Expression-only rewrites are limited for nested structs since they do not 
modify the actual data arrays.  |
   | **Data vs. Expression Adaptation** | Expression 
rewriting is great for pushdown but batch-level adapters are needed for 
correct, shaped data.  |
   | **Complementary Approach** | Use 
expression rewriting for filters/projections + array-level adapters (e.g. 
`cast_struct_column`) to reshape in-memory data. |
   | **Projection Pushdown Scenario**   | A scan can 
receive full projection expressions (e.g. `a + b`), which get adapted and 
evaluated on RecordBatch, producing final output. |
   | **Risks of Expression-Only Rewrites**  | - No effect 
on RecordBatch structure.- Limited scope (only predicates and 
pruning).- Risk of code duplication.- Complex handling for deeply 
nested types.- Possibly poorer performance due to repeated expression 
rewrites. |
   | **Potential Benefits of Expression-Based Rewrites**   | Cleaner 
pruning path, simpler code, no fake batches for evaluation, reusable visitor 
pattern.|
   
   ---
   
   ## Diagram: Data Adaptation vs. Expression Rewriting Flow
   
   ```text
   +--+  +--+   
   ++
   | Query Logical |  | File Physical|  
| In-Memory Batch |
   | Schema   |  | Schema  |
  | (RecordBatch)   |
   | (e.g. table)  |  | (Parquet file)   |  
| |
   +--+  +--+   
   ++
 |   |  
 |
 |   |  
 |
 |Expression Rewriting  |   
|
 | <--- adapts --   
   |
 |  

Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]

2025-06-23 Thread via GitHub


adriangb commented on PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#issuecomment-2998670275

   I opened https://github.com/apache/datafusion/issues/16528 to track further 
ideas / steps


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]

2025-06-23 Thread via GitHub


alamb commented on code in PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#discussion_r2162492604


##
datafusion/physical-expr/src/schema_rewriter.rs:
##
@@ -0,0 +1,318 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Physical expression schema rewriting utilities
+
+use std::sync::Arc;
+
+use arrow::compute::can_cast_types;
+use arrow::datatypes::{FieldRef, Schema};
+use datafusion_common::{
+exec_err,
+tree_node::{Transformed, TransformedResult, TreeNode},
+Result, ScalarValue,
+};
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+
+use crate::expressions::{self, CastExpr, Column};
+
+/// Builder for rewriting physical expressions to match different schemas.
+///
+/// # Example
+///
+/// ```rust
+/// use datafusion_physical_expr::schema_rewriter::PhysicalExprSchemaRewriter;
+/// use arrow::datatypes::Schema;
+///
+/// # fn example(
+/// # predicate: std::sync::Arc,
+/// # physical_file_schema: &Schema,
+/// # logical_file_schema: &Schema,
+/// # ) -> datafusion_common::Result<()> {
+/// let rewriter = PhysicalExprSchemaRewriter::new(physical_file_schema, 
logical_file_schema);
+/// let adapted_predicate = rewriter.rewrite(predicate)?;
+/// # Ok(())
+/// # }
+/// ```
+pub struct PhysicalExprSchemaRewriter<'a> {
+physical_file_schema: &'a Schema,
+logical_file_schema: &'a Schema,
+partition_fields: Vec,
+partition_values: Vec,
+}
+
+impl<'a> PhysicalExprSchemaRewriter<'a> {
+/// Create a new schema rewriter with the given schemas
+pub fn new(
+physical_file_schema: &'a Schema,
+logical_file_schema: &'a Schema,
+) -> Self {
+Self {
+physical_file_schema,
+logical_file_schema,
+partition_fields: Vec::new(),
+partition_values: Vec::new(),
+}
+}
+
+/// Add partition columns and their corresponding values
+///
+/// When a column reference matches a partition field, it will be replaced
+/// with the corresponding literal value from partition_values.
+pub fn with_partition_columns(
+mut self,
+partition_fields: Vec,
+partition_values: Vec,
+) -> Self {
+self.partition_fields = partition_fields;
+self.partition_values = partition_values;
+self
+}
+
+/// Rewrite the given physical expression to match the target schema
+///
+/// This method applies the following transformations:
+/// 1. Replaces partition column references with literal values
+/// 2. Handles missing columns by inserting null literals
+/// 3. Casts columns when logical and physical schemas have different types
+pub fn rewrite(&self, expr: Arc) -> Result> {
+expr.transform(|expr| self.rewrite_expr(expr)).data()
+}
+
+fn rewrite_expr(
+&self,
+expr: Arc,
+) -> Result>> {
+if let Some(column) = expr.as_any().downcast_ref::() {
+return self.rewrite_column(Arc::clone(&expr), column);
+}
+
+Ok(Transformed::no(expr))
+}
+
+fn rewrite_column(
+&self,
+expr: Arc,
+column: &Column,
+) -> Result>> {
+// Get the logical field for this column
+let logical_field = match 
self.logical_file_schema.field_with_name(column.name())
+{
+Ok(field) => field,
+Err(e) => {
+// If the column is a partition field, we can use the 
partition value
+if let Some(partition_value) = 
self.get_partition_value(column.name()) {
+return 
Ok(Transformed::yes(expressions::lit(partition_value)));
+}
+// If the column is not found in the logical schema and is not 
a partition value, return an error
+// This should probably never be hit unless something upstream 
broke, but nontheless it's better
+// for us to return a handleable error than to panic / do 
something unexpected.
+return Err(e.into());
+}
+};
+
+// Check if the column exists in the physical schema
+let physical_column_index =
+match self.physical_f

Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]

2025-06-23 Thread via GitHub


adriangb commented on code in PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#discussion_r2162548231


##
datafusion/physical-expr/src/schema_rewriter.rs:
##
@@ -0,0 +1,318 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Physical expression schema rewriting utilities
+
+use std::sync::Arc;
+
+use arrow::compute::can_cast_types;
+use arrow::datatypes::{FieldRef, Schema};
+use datafusion_common::{
+exec_err,
+tree_node::{Transformed, TransformedResult, TreeNode},
+Result, ScalarValue,
+};
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+
+use crate::expressions::{self, CastExpr, Column};
+
+/// Builder for rewriting physical expressions to match different schemas.
+///
+/// # Example
+///
+/// ```rust
+/// use datafusion_physical_expr::schema_rewriter::PhysicalExprSchemaRewriter;
+/// use arrow::datatypes::Schema;
+///
+/// # fn example(
+/// # predicate: std::sync::Arc,
+/// # physical_file_schema: &Schema,
+/// # logical_file_schema: &Schema,
+/// # ) -> datafusion_common::Result<()> {
+/// let rewriter = PhysicalExprSchemaRewriter::new(physical_file_schema, 
logical_file_schema);
+/// let adapted_predicate = rewriter.rewrite(predicate)?;
+/// # Ok(())
+/// # }
+/// ```
+pub struct PhysicalExprSchemaRewriter<'a> {
+physical_file_schema: &'a Schema,
+logical_file_schema: &'a Schema,
+partition_fields: Vec,
+partition_values: Vec,
+}
+
+impl<'a> PhysicalExprSchemaRewriter<'a> {
+/// Create a new schema rewriter with the given schemas
+pub fn new(
+physical_file_schema: &'a Schema,
+logical_file_schema: &'a Schema,
+) -> Self {
+Self {
+physical_file_schema,
+logical_file_schema,
+partition_fields: Vec::new(),
+partition_values: Vec::new(),
+}
+}
+
+/// Add partition columns and their corresponding values
+///
+/// When a column reference matches a partition field, it will be replaced
+/// with the corresponding literal value from partition_values.
+pub fn with_partition_columns(
+mut self,
+partition_fields: Vec,
+partition_values: Vec,
+) -> Self {
+self.partition_fields = partition_fields;
+self.partition_values = partition_values;
+self
+}
+
+/// Rewrite the given physical expression to match the target schema
+///
+/// This method applies the following transformations:
+/// 1. Replaces partition column references with literal values
+/// 2. Handles missing columns by inserting null literals
+/// 3. Casts columns when logical and physical schemas have different types
+pub fn rewrite(&self, expr: Arc) -> Result> {
+expr.transform(|expr| self.rewrite_expr(expr)).data()
+}
+
+fn rewrite_expr(
+&self,
+expr: Arc,
+) -> Result>> {
+if let Some(column) = expr.as_any().downcast_ref::() {
+return self.rewrite_column(Arc::clone(&expr), column);
+}
+
+Ok(Transformed::no(expr))
+}
+
+fn rewrite_column(
+&self,
+expr: Arc,
+column: &Column,
+) -> Result>> {
+// Get the logical field for this column
+let logical_field = match 
self.logical_file_schema.field_with_name(column.name())
+{
+Ok(field) => field,
+Err(e) => {
+// If the column is a partition field, we can use the 
partition value
+if let Some(partition_value) = 
self.get_partition_value(column.name()) {
+return 
Ok(Transformed::yes(expressions::lit(partition_value)));
+}
+// If the column is not found in the logical schema and is not 
a partition value, return an error
+// This should probably never be hit unless something upstream 
broke, but nontheless it's better
+// for us to return a handleable error than to panic / do 
something unexpected.
+return Err(e.into());
+}
+};
+
+// Check if the column exists in the physical schema
+let physical_column_index =
+match self.physica

Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]

2025-06-23 Thread via GitHub


alamb commented on PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#issuecomment-2997873713

   I would personally recommend proceeding in parallel with the two approaches, 
ensuring there are good end to end tests (.slt) -- and then if we find that the 
projection pushdown / rewriting code can subsume the schema adapter code we 
could make a PR to do that 🤔 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]

2025-06-23 Thread via GitHub


alamb commented on PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#issuecomment-2997870791

   > But we’d still need an array-level counterpart to actually materialize 
those null nested fields in the RecordBatch when we call map_batch.
   
   FWIW I think this is one mechanism to turn the expression into an array (you 
just need to evaluate the expression into a PhysicalExpr):
   - 
https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.ColumnarValue.html#method.into_array


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]

2025-06-23 Thread via GitHub


adriangb commented on PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#issuecomment-2996225226

   Thanks for the thoughtful reply. An important point that I forgot to 
mention: we're actively working on projection / selection pushdown which would 
involve pushing down expressions into the scan, thus we *would* evaluate the 
expressions and materialize the record batches in the output.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]

2025-06-23 Thread via GitHub


kosiew commented on PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#issuecomment-2994947013

   @adriangb 
   Thanks for the ping on this.
   
   > Would it be possible to implement the nested struct imputation work you're 
doing with this approach?
   
   Do you mean  reusing the PhysicalExprSchemaRewriter machinery to drive 
nested‐struct imputation?
   
   Here're some disadvantages of the rewrite‐centric approach versus the more 
data‐centric adapter approach:
   
   - Expression-only, not data-only: This never actually transforms the 
underlying RecordBatch columns—if downstream logic (or the user) inspects a 
struct column directly, they won’t see the new null fields injected. We’d still 
need array-level imputation for correctness in the result batches.
   
   - Limited to predicate contexts: The rewriter hooks into filter and pruning, 
but our broader schema-evolution needs (e.g. reading all columns, SELECT *, 
writing out evolved nested structs) fall outside its scope.
   
   - Duplication risk: We end up reinventing part of the schema-adapter’s 
compatibility logic (matching fields by name, casting types) inside the 
rewriter, which can drift from the adapter’s rules over time.
   
   - Complexity with deep nesting: Recursively handling deeply nested structs 
inside an expression tree—and ensuring every nested‐field access gets rewritten 
with the right shape—quickly becomes more intricate than a simple tree visitor.
   
   - Performance implications: Constantly rewriting and reconstructing 
expression trees (and then evaluating those casts/lits) might be less efficient 
than bulk array‐level casts + struct builds, especially on wide tables.
   
   
   So, could we bolt nested‐struct imputation onto his rewriter? Technically 
yes, we could extend rewrite_column so that, whenever we see a Column referring 
to foo.bar.baz that’s missing in the physical schema, you generate a 
Literal::Null of the full nested type (constructing the proper StructValue). 
But we’d still need an array-level counterpart to actually materialize those 
null nested fields in the RecordBatch when we call map_batch.
   
   In practice, the two approaches complement each other:
   
   Use the rewriter to handle predicate and projection expressions (so filters 
and column references don’t blow up).
   
   Continue to rely on  cast_struct_column + NestedStructSchemaAdapter to adapt 
the actual batch data, filling in null arrays and doing recursive casts.
   
   That way we get the best of both worlds—clean, centralized expression 
rewriting for pushdown, and robust array-level marshalling for the final 
tables. 😊
   
   ## Why the two-pronged approach makes sense
   
   1. Pushdown vs. Data Adaptation Are Different Concerns
   
   The PhysicalExprSchemaRewriter is perfect for rewriting predicates and 
projections so they don’t blow up when the on-disk schema diverges.
   
   But once you’ve read that Parquet row group into memory, you still need to 
reshape the StructArray itself—filling in null arrays for new nested fields, 
dropping old ones, recursively casting types.
   
   2. Keeping Pruning Code Lean
   
   Swapping out the old SchemaMapper for the rewriter in your pruning path is a 
great win: much less boilerplate, better separation of concerns, and no more 
“fake record batches” just to evaluate a filter.
   
   You can remove all of that pruning-specific adapter code and lean on the 
rewriter’s tree visitor.
   
   3. Deep Schema-Evolution Still Lives in the Adapter
   
   Handling a top-level missing column is easy in the rewriter (you just 
rewrite col("foo") to lit(NULL)), but handling col("a.b.c") where b itself is a 
new struct, and c is a new field inside it… that’s far more natural in a 
recursive cast_struct_column that operates on StructArray children.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]

2025-06-23 Thread via GitHub


kosiew commented on PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#issuecomment-2994965537

   Putting more words to how I understand pushdown and data adaptation:
   
   1. Pushdown — “Which rows or pages should I read?”
   - Input: your original predicate (e.g. col("foo.b") > 5) and the physical 
Parquet schema.
   
   - What the rewriter does:
  - Sees that foo.b doesn’t exist on disk → replaces col("foo.b") > 5 with 
lit(NULL) > 5.
  - Or if foo.a is stored as Int32 but the table expects Int64, it wraps 
col("foo.a") in a cast.
   
   - Result: you get a “safe” predicate that Parquet can evaluate against 
row‐group statistics or pages without error.
   
   - Outcome: you prune away unneeded row groups, or skip pages, based on that 
rewritten expression.
   
   At the end of this step, no data has actually been materialized—you’ve only 
modified the expression you use to decide what to read.
   
   2. Data adaptation — “How do I shape the in-memory batch to match the 
logical schema?”
   - Input: a RecordBatch (or StructArray) that you read directly from Parquet.
   
 - This batch is laid out exactly as on disk: it only has the columns that 
existed in that file’s schema, and nested structs only contain the old fields.
   
   - What the adapter does (map_batch / cast_struct_column):
 - Field matching: for each field in your logical (table) schema, look it 
up by name in the batch’s arrays.
 - Missing fields → insert a new_null_array(...) of the right datatype and 
row count.
 - Extra fields (present on disk but dropped in the table) → ignore them.
 - Nested structs → recurse into child struct arrays, doing the same 
match/fill/ignore/cast logic at each level.
   
   - Result: a brand-new StructArray (and overall RecordBatch) whose columns 
exactly line up with your table schema—even for deeply nested new fields.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]

2025-06-22 Thread via GitHub


adriangb commented on PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#issuecomment-2994216351

   @kosiew I'm curious what you think about this. Would it be possible to 
implement the nested struct imputation work you're doing with this approach?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]

2025-06-22 Thread via GitHub


adriangb commented on PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#issuecomment-2994242282

   Just for fun I opened https://github.com/pydantic/datafusion/pull/31 to see 
how hard it would be to incorporate 
https://github.com/apache/datafusion/issues/15780#issuecomment-2824716928, not 
too bad! Most of the code could be shared with the logical layer with some 
refactoring `pub`ing, etc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]

2025-06-22 Thread via GitHub


adriangb commented on PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#issuecomment-2994214075

   My hopes with this work is that we can:
   - Make it easier to do further optimizations like 
https://github.com/apache/datafusion/issues/15780#issuecomment-2824716928
   - Replace the existing SchemaAdapter machinery
   - Add hooks to this builder for `with_missing_column_handling(...)` to close 
#15261 and `visit_expression(f: Fn(Arc, &Schema) -> 
Transformed>)` to close #15057
   - Make it easier to work #15821 in because we scan at the file schema


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]

2025-06-21 Thread via GitHub


adriangb commented on PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#issuecomment-2993667691

   @alamb I've created the builder, moved the implementation and added some 
unit tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]

2025-06-20 Thread via GitHub


adriangb commented on code in PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#discussion_r2159712965


##
datafusion/datasource-parquet/src/opener.rs:
##
@@ -524,6 +539,84 @@ fn should_enable_page_index(
 .unwrap_or(false)
 }
 
+use datafusion_physical_expr::expressions;
+
+/// Given a [`PhysicalExpr`] and a [`SchemaRef`], returns a new 
[`PhysicalExpr`] that
+/// is cast to the specified data type.
+/// Preference is always given to casting literal values to the data type of 
the column
+/// since casting the column to the literal value's data type can be 
significantly more expensive.
+/// Given two columns the cast is applied arbitrarily to the first column.
+pub fn cast_expr_to_schema(

Review Comment:
   Sounds good to me, I will work on this next! Agreed on the unit tests 😄 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]

2025-06-20 Thread via GitHub


adriangb commented on code in PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#discussion_r2159713329


##
datafusion/datasource-parquet/src/row_filter.rs:
##
@@ -520,111 +489,15 @@ mod test {
 let expr = col("int64_list").is_not_null();
 let expr = logical2physical(&expr, &table_schema);
 
-let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory);
 let table_schema = Arc::new(table_schema.clone());
 
-let candidate = FilterCandidateBuilder::new(
-expr,
-table_schema.clone(),
-table_schema,
-schema_adapter_factory,
-)
-.build(metadata)
-.expect("building candidate");
+let candidate = FilterCandidateBuilder::new(expr, table_schema.clone())
+.build(metadata)
+.expect("building candidate");
 
 assert!(candidate.is_none());
 }
 
-#[test]
-fn test_filter_type_coercion() {

Review Comment:
   I think so - I'll have to make it more e2e since it is no longer specific to 
the row filter. We have some other similar tests, I'll work this into there



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]

2025-06-20 Thread via GitHub


alamb commented on code in PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#discussion_r2159679800


##
datafusion/datasource-parquet/src/opener.rs:
##
@@ -524,6 +539,84 @@ fn should_enable_page_index(
 .unwrap_or(false)
 }
 
+use datafusion_physical_expr::expressions;
+
+/// Given a [`PhysicalExpr`] and a [`SchemaRef`], returns a new 
[`PhysicalExpr`] that
+/// is cast to the specified data type.
+/// Preference is always given to casting literal values to the data type of 
the column
+/// since casting the column to the literal value's data type can be 
significantly more expensive.
+/// Given two columns the cast is applied arbitrarily to the first column.
+pub fn cast_expr_to_schema(
+expr: Arc,
+physical_file_schema: &Schema,
+logical_file_schema: &Schema,
+partition_values: Vec,
+partition_fields: &[FieldRef],
+) -> Result> {
+expr.transform(|expr| {
+if let Some(column) = 
expr.as_any().downcast_ref::() {
+let logical_field = match 
logical_file_schema.field_with_name(column.name()) {
+Ok(field) => field,
+Err(e) => {
+// If the column is a partition field, we can use the 
partition value
+for (partition_field, partition_value) in
+partition_fields.iter().zip(partition_values.iter())
+{
+if partition_field.name() == column.name() {
+return Ok(Transformed::yes(expressions::lit(
+partition_value.clone(),
+)));
+}
+}
+// If the column is not found in the logical schema, 
return an error
+// This should probably never be hit unless something 
upstream broke, but nontheless it's better
+// for us to return a handleable error than to panic / do 
something unexpected.
+return Err(e.into());
+}
+};
+let Ok(physical_field) = 
physical_file_schema.field_with_name(column.name())
+else {
+if !logical_field.is_nullable() {
+return exec_err!(
+"Non-nullable column '{}' is missing from the physical 
schema",
+column.name()
+);
+}
+// If the column is missing from the physical schema fill it 
in with nulls as `SchemaAdapter` would do.
+// TODO: do we need to sync this with what the `SchemaAdapter` 
actually does?
+// While the default implementation fills in nulls in theory a 
custom `SchemaAdapter` could do something else!
+let value = 
ScalarValue::Null.cast_to(logical_field.data_type())?;
+return Ok(Transformed::yes(expressions::lit(value)));
+};
+
+if logical_field.data_type() == physical_field.data_type() {
+return Ok(Transformed::no(expr));
+}
+
+// If the logical field and physical field are different, we need 
to cast
+// the column to the logical field's data type.
+// We will try later to move the cast to literal values if 
possible, which is computationally cheaper.

Review Comment:
   👍 



##
datafusion/datasource-parquet/src/opener.rs:
##
@@ -524,6 +539,84 @@ fn should_enable_page_index(
 .unwrap_or(false)
 }
 
+use datafusion_physical_expr::expressions;
+
+/// Given a [`PhysicalExpr`] and a [`SchemaRef`], returns a new 
[`PhysicalExpr`] that
+/// is cast to the specified data type.
+/// Preference is always given to casting literal values to the data type of 
the column
+/// since casting the column to the literal value's data type can be 
significantly more expensive.
+/// Given two columns the cast is applied arbitrarily to the first column.
+pub fn cast_expr_to_schema(
+expr: Arc,
+physical_file_schema: &Schema,
+logical_file_schema: &Schema,
+partition_values: Vec,
+partition_fields: &[FieldRef],
+) -> Result> {
+expr.transform(|expr| {
+if let Some(column) = 
expr.as_any().downcast_ref::() {
+let logical_field = match 
logical_file_schema.field_with_name(column.name()) {
+Ok(field) => field,
+Err(e) => {
+// If the column is a partition field, we can use the 
partition value
+for (partition_field, partition_value) in
+partition_fields.iter().zip(partition_values.iter())
+{
+if partition_field.name() == column.name() {
+return Ok(Transformed::yes(expressions::lit(
+partition_value.clone(),
+)));
+}
+ 

Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]

2025-06-19 Thread via GitHub


adriangb commented on code in PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#discussion_r2157528156


##
datafusion/datasource-parquet/src/opener.rs:
##
@@ -879,4 +972,107 @@ mod test {
 assert_eq!(num_batches, 0);
 assert_eq!(num_rows, 0);
 }
+
+#[tokio::test]
+async fn test_prune_on_partition_value_and_data_value() {
+let store = Arc::new(InMemory::new()) as Arc;
+
+// Note: number 3 is missing!
+let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), 
Some(4)])).unwrap();
+let data_size =
+write_parquet(Arc::clone(&store), "part=1/file.parquet", 
batch.clone()).await;
+
+let file_schema = batch.schema();
+let mut file = PartitionedFile::new(
+"part=1/file.parquet".to_string(),
+u64::try_from(data_size).unwrap(),
+);
+file.partition_values = vec![ScalarValue::Int32(Some(1))];
+
+let table_schema = Arc::new(Schema::new(vec![
+Field::new("part", DataType::Int32, false),
+Field::new("a", DataType::Int32, false),
+]));
+
+let make_opener = |predicate| {
+ParquetOpener {
+partition_index: 0,
+projection: Arc::new([0]),
+batch_size: 1024,
+limit: None,
+predicate: Some(predicate),
+logical_file_schema: file_schema.clone(),
+metadata_size_hint: None,
+metrics: ExecutionPlanMetricsSet::new(),
+parquet_file_reader_factory: Arc::new(
+DefaultParquetFileReaderFactory::new(Arc::clone(&store)),
+),
+partition_fields: vec![Arc::new(Field::new(
+"part",
+DataType::Int32,
+false,
+))],
+pushdown_filters: true, // note that this is true!
+reorder_filters: true,
+enable_page_index: false,
+enable_bloom_filter: false,
+schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
+enable_row_group_stats_pruning: false, // note that this is 
false!
+coerce_int96: None,
+}
+};
+
+let make_meta = || FileMeta {
+object_meta: ObjectMeta {
+location: Path::from("part=1/file.parquet"),
+last_modified: Utc::now(),
+size: u64::try_from(data_size).unwrap(),
+e_tag: None,
+version: None,
+},
+range: None,
+extensions: None,
+metadata_size_hint: None,
+};
+
+// Filter should match the partition value and data value
+let expr = col("part").eq(lit(1)).or(col("a").eq(lit(1)));
+let predicate = logical2physical(&expr, &table_schema);
+let opener = make_opener(predicate);
+let stream = opener
+.open(make_meta(), file.clone())
+.unwrap()
+.await
+.unwrap();
+let (num_batches, num_rows) = count_batches_and_rows(stream).await;
+assert_eq!(num_batches, 1);
+assert_eq!(num_rows, 3);
+
+// Filter should match the partition value but not the data value
+let expr = col("part").eq(lit(1)).or(col("a").eq(lit(3)));
+let predicate = logical2physical(&expr, &table_schema);
+let opener = make_opener(predicate);
+let stream = opener.open(make_meta(), 
file.clone()).unwrap().await.unwrap();
+let (num_batches, num_rows) = count_batches_and_rows(stream).await;
+assert_eq!(num_batches, 1);
+assert_eq!(num_rows, 3);
+
+// Filter should not match the partition value but match the data value
+let expr = col("part").eq(lit(2)).or(col("a").eq(lit(1)));
+let predicate = logical2physical(&expr, &table_schema);
+let opener = make_opener(predicate);
+let stream = opener.open(make_meta(), 
file.clone()).unwrap().await.unwrap();
+let (num_batches, num_rows) = count_batches_and_rows(stream).await;
+assert_eq!(num_batches, 1);
+assert_eq!(num_rows, 1);

Review Comment:
   This assertion fails on `main`: all 3 rows are passed because the row filter 
cannot handle the partition columns. This PR somewhat coincidentally happens to 
allow the row filter to handle predicates that depend on partition and data 
columns!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]

Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]

2025-06-19 Thread via GitHub


adriangb commented on code in PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#discussion_r2157528156


##
datafusion/datasource-parquet/src/opener.rs:
##
@@ -879,4 +972,107 @@ mod test {
 assert_eq!(num_batches, 0);
 assert_eq!(num_rows, 0);
 }
+
+#[tokio::test]
+async fn test_prune_on_partition_value_and_data_value() {
+let store = Arc::new(InMemory::new()) as Arc;
+
+// Note: number 3 is missing!
+let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), 
Some(4)])).unwrap();
+let data_size =
+write_parquet(Arc::clone(&store), "part=1/file.parquet", 
batch.clone()).await;
+
+let file_schema = batch.schema();
+let mut file = PartitionedFile::new(
+"part=1/file.parquet".to_string(),
+u64::try_from(data_size).unwrap(),
+);
+file.partition_values = vec![ScalarValue::Int32(Some(1))];
+
+let table_schema = Arc::new(Schema::new(vec![
+Field::new("part", DataType::Int32, false),
+Field::new("a", DataType::Int32, false),
+]));
+
+let make_opener = |predicate| {
+ParquetOpener {
+partition_index: 0,
+projection: Arc::new([0]),
+batch_size: 1024,
+limit: None,
+predicate: Some(predicate),
+logical_file_schema: file_schema.clone(),
+metadata_size_hint: None,
+metrics: ExecutionPlanMetricsSet::new(),
+parquet_file_reader_factory: Arc::new(
+DefaultParquetFileReaderFactory::new(Arc::clone(&store)),
+),
+partition_fields: vec![Arc::new(Field::new(
+"part",
+DataType::Int32,
+false,
+))],
+pushdown_filters: true, // note that this is true!
+reorder_filters: true,
+enable_page_index: false,
+enable_bloom_filter: false,
+schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
+enable_row_group_stats_pruning: false, // note that this is 
false!
+coerce_int96: None,
+}
+};
+
+let make_meta = || FileMeta {
+object_meta: ObjectMeta {
+location: Path::from("part=1/file.parquet"),
+last_modified: Utc::now(),
+size: u64::try_from(data_size).unwrap(),
+e_tag: None,
+version: None,
+},
+range: None,
+extensions: None,
+metadata_size_hint: None,
+};
+
+// Filter should match the partition value and data value
+let expr = col("part").eq(lit(1)).or(col("a").eq(lit(1)));
+let predicate = logical2physical(&expr, &table_schema);
+let opener = make_opener(predicate);
+let stream = opener
+.open(make_meta(), file.clone())
+.unwrap()
+.await
+.unwrap();
+let (num_batches, num_rows) = count_batches_and_rows(stream).await;
+assert_eq!(num_batches, 1);
+assert_eq!(num_rows, 3);
+
+// Filter should match the partition value but not the data value
+let expr = col("part").eq(lit(1)).or(col("a").eq(lit(3)));
+let predicate = logical2physical(&expr, &table_schema);
+let opener = make_opener(predicate);
+let stream = opener.open(make_meta(), 
file.clone()).unwrap().await.unwrap();
+let (num_batches, num_rows) = count_batches_and_rows(stream).await;
+assert_eq!(num_batches, 1);
+assert_eq!(num_rows, 3);
+
+// Filter should not match the partition value but match the data value
+let expr = col("part").eq(lit(2)).or(col("a").eq(lit(1)));
+let predicate = logical2physical(&expr, &table_schema);
+let opener = make_opener(predicate);
+let stream = opener.open(make_meta(), 
file.clone()).unwrap().await.unwrap();
+let (num_batches, num_rows) = count_batches_and_rows(stream).await;
+assert_eq!(num_batches, 1);
+assert_eq!(num_rows, 1);

Review Comment:
   This assertion fails on `main`: all 3 rows are passed because the row filter 
cannot handle the partition columns. This PR somewhat coincidentally happens to 
fix this!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]

2025-06-19 Thread via GitHub


adriangb commented on code in PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#discussion_r2157496405


##
datafusion/datasource-parquet/src/opener.rs:
##
@@ -524,6 +532,62 @@ fn should_enable_page_index(
 .unwrap_or(false)
 }
 
+use datafusion_physical_expr::expressions;
+
+/// Given a [`PhysicalExpr`] and a [`SchemaRef`], returns a new 
[`PhysicalExpr`] that
+/// is cast to the specified data type.
+/// Preference is always given to casting literal values to the data type of 
the column
+/// since casting the column to the literal value's data type can be 
significantly more expensive.
+/// Given two columns the cast is applied arbitrarily to the first column.
+pub fn cast_expr_to_schema(
+expr: Arc,
+physical_file_schema: &Schema,
+logical_file_schema: &Schema,
+) -> Result> {
+expr.transform(|expr| {
+if let Some(column) = 
expr.as_any().downcast_ref::() {
+let logical_field = 
logical_file_schema.field_with_name(column.name())?;
+let Ok(physical_field) = 
physical_file_schema.field_with_name(column.name())
+else {
+if !logical_field.is_nullable() {
+return exec_err!(
+"Non-nullable column '{}' is missing from the physical 
schema",
+column.name()
+);

Review Comment:
   Might be useful to include some sort of file identifier here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]