Josh Rosen created SPARK-27969:
----------------------------------

             Summary: Non-deterministic expressions in filters or projects can 
unnecessarily prevent all scan-time column pruning, harming performance
                 Key: SPARK-27969
                 URL: https://issues.apache.org/jira/browse/SPARK-27969
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 2.4.0
            Reporter: Josh Rosen


If a scan operator is followed by a projection or filter and those operators 
contain _any_ non-deterministic expressions then scan column pruning 
optimizations are completely skipped, harming query performance.

Here's an example of the problem:
{code:java}
import org.apache.spark.sql.functions._
val df = spark.createDataset(Seq(
  (1, 2, 3, 4, 5),
  (1, 2, 3, 4, 5)
))
val tmpPath = 
java.nio.file.Files.createTempDirectory("column-pruning-bug").toString()
df.write.parquet(tmpPath)
val fromParquet = spark.read.parquet(tmpPath){code}
If all expressions are deterministic then, as expected, column pruning is 
pushed into the scan
{code:java}
fromParquet.select("_1").explain

== Physical Plan == *(1) FileScan parquet [_1#68] Batched: true, DataFilters: 
[], Format: Parquet, Location: 
InMemoryFileIndex[dbfs:/local_disk0/tmp/column-pruning-bug7865798834978814548], 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_1:int>{code}
However, if we add a non-deterministic filter then no column pruning is 
performed (even though pruning would be safe!):
{code:java}
fromParquet.select("_1").filter(rand() =!= 0).explain

== Physical Plan ==
*(1) Project [_1#127]
+- *(1) Filter NOT (rand(-1515289268025792238) = 0.0)
+- *(1) FileScan parquet [_1#127,_2#128,_3#129,_4#130,_5#131] Batched: true, 
DataFilters: [], Format: Parquet, Location: 
InMemoryFileIndex[dbfs:/local_disk0/tmp/column-pruning-bug4043817424882943496], 
PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct<_1:int,_2:int,_3:int,_4:int,_5:int>{code}
Similarly, a non-deterministic expression in a second projection can end up 
being collapsed such that it prevents column pruning:
{code:java}
fromParquet.select("_1").select($"_1", rand()).explain

== Physical Plan ==
*(1) Project [_1#127, rand(1267140591146561563) AS 
rand(1267140591146561563)#141]
+- *(1) FileScan parquet [_1#127,_2#128,_3#129,_4#130,_5#131] Batched: true, 
DataFilters: [], Format: Parquet, Location: 
InMemoryFileIndex[dbfs:/local_disk0/tmp/column-pruning-bug4043817424882943496], 
PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct<_1:int,_2:int,_3:int,_4:int,_5:int>
{code}
I believe that this is caused by SPARK-10316: the Parquet column pruning code 
relies on the [{{PhysicalOperation}} unapply 
method|https://github.com/apache/spark/blob/v2.4.3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala#L43]
 for extracting projects and filters and this helper purposely fails to match 
if _any_ projection or filter is non-deterministic.

It looks like this conservative behavior may have originally been added to 
avoid pushdown / re-ordering of non-deterministic filter expressions. However, 
in this case I feel that it's _too_ conservative: even though we can't push 
down non-deterministic filters we should still be able to perform column 
pruning. 

/cc [~cloud_fan] and [~marmbrus] (it looks like you [discussed collapsing of 
non-deterministic 
projects|https://github.com/apache/spark/pull/8486#issuecomment-136036533] in 
the SPARK-10316 PR, which is related to why the third example above did not 
prune).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to