huaxingao commented on a change in pull request #35691:
URL: https://github.com/apache/spark/pull/35691#discussion_r818864088
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
##########
@@ -83,7 +89,7 @@ private[sql] object PruneFileSourcePartitions
HadoopFsRelation(
catalogFileIndex: CatalogFileIndex,
Review comment:
Right, the key is to do partition pruning only once, but the root cause
of doing partition pruning infinitely in 3.2 is because the partition filters
and data filters are not separately correctly.
Here is the physical plan for file source v1 in 3.2, partition filters and
data filters are separately correctly
```
== Physical Plan ==
*(1) Filter (((p#10 = 0) AND (id#9L > 0)) OR ((p#10 = 1) AND (id#9L = 2)))
+- *(1) ColumnarToRow
+- FileScan parquet [id#9L,p#10] Batched: true, DataFilters: [((id#9L >
0) OR (id#9L = 2))], Format: Parquet, Location: InMemoryFileIndex(1
paths)[file:/private/var/folders/pt/_5f4sxy56x70dv9zpz032f0m0000gn/T/spark-b0...,
PartitionFilters: [((p#10 = 0) OR (p#10 = 1))], PushedFilters:
[Or(GreaterThan(id,0),EqualTo(id,2))], ReadSchema: struct<id:bigint>
```
In 3.2 file source v2, partition filters and data filters are NOT
separately correctly, after separating the filters, partition filters are
`[((p#10 = 0) OR (p#10 = 1))]`, but data filters are `[(((p#10 = 0) AND (id#9L
> 0)) OR ((p#10 = 1) AND (id#9L = 2)))]`. In the next round the data filters
are separated again and gets in an infinitely loop.
In 3.3 the partition filters and data filters are NOT separately correctly
either. I am thinking of fixing 3.3 too after this PR is done.
Here is the physical plan for file source v1 in 3.3, partition filters and
data filters are separately correctly
```
== Physical Plan ==
*(1) Filter (((p#10 = 0) AND (id#9L > 0)) OR ((p#10 = 1) AND (id#9L = 2)))
+- *(1) ColumnarToRow
+- FileScan parquet [id#9L,p#10] Batched: true, DataFilters: [((id#9L >
0) OR (id#9L = 2))], Format: Parquet, Location: InMemoryFileIndex(1
paths)[file:/private/var/folders/pt/_5f4sxy56x70dv9zpz032f0m0000gn/T/spark-c2...,
PartitionFilters: [((p#10 = 0) OR (p#10 = 1))], PushedFilters:
[Or(GreaterThan(id,0),EqualTo(id,2))], ReadSchema: struct<id:bigint>
```
Here is the physical plan for file source v2 in 3.3, partition filters and
data filters NOT separately correctly. The data filter is `[(((p#10 = 0) AND
(id#9L > 0)) OR ((p#10 = 1) AND (id#9L = 2)))]`, but the correct data filters
`[Or(GreaterThan(id,0),EqualTo(id,2))]` are pushed down, because when trying to
push down the data filters, Spark checks `canMakeFilterOn` to see if the
filters are on data columns and doesn't construct parquet filters if the
filters are NOT on data columns.
```
== Physical Plan ==
*(1) Filter (((p#10 = 0) AND (id#9L > 0)) OR ((p#10 = 1) AND (id#9L = 2)))
+- *(1) ColumnarToRow
+- BatchScan[id#9L, p#10] ParquetScan DataFilters: [(((p#10 = 0) AND
(id#9L > 0)) OR ((p#10 = 1) AND (id#9L = 2)))], Format: parquet, Location:
InMemoryFileIndex(1
paths)[file:/private/var/folders/pt/_5f4sxy56x70dv9zpz032f0m0000gn/T/spark-e1...,
PartitionFilters: [((p#10 = 0) OR (p#10 = 1))], PushedAggregation: [],
PushedFilters: [Or(GreaterThan(id,0),EqualTo(id,2))], PushedGroupBy: [],
ReadSchema: struct<id:bigint>, PushedFilters:
[Or(GreaterThan(id,0),EqualTo(id,2))], PushedAggregation: [], PushedGroupBy: []
RuntimeFilters: []
```
Hope I explained this clearly :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]