GitHub user HyukjinKwon opened a pull request:
https://github.com/apache/spark/pull/15049
[SPARK-17310][SQL] Add an option to disable record-level filter in
Parquet-side
## What changes were proposed in this pull request?
There is a concern that Spark-side codegen row-by-row filtering might be
faster than Parquet's one in general due to type-boxing and virtual function
calls which Spark's one tries to avoid.
So, this PR adds an option to disable/enable record-by-record filtering in
Parquet side.
This was also discussed in https://github.com/apache/spark/pull/14671.
## How was this patch tested?
Manually benchmarks were performed. I generate a billion (1,000,000,000)
records and tested equality comparison concatenated with `OR`. This filter
combinations were made from 5 to 30.
It seem indeed Spark-filtering is faster in the test case and the gap
increased as the filter tree becomes larger.
The details are as below:
**Code**
```scala
test("Parquet-side filter vs Spark-side filter - record by record") {
withTempPath { path =>
val N = 1000 * 1000 * 1000
val df = spark.range(N).toDF("a")
df.write.parquet(path.getAbsolutePath)
val benchmark = new Benchmark("Parquet-side vs Spark-side", N)
Seq(5, 10, 20, 30).foreach { num =>
val filterExpr = (0 to num).map(i => s"a = $i").mkString(" OR ")
benchmark.addCase(s"Parquet-side filter - number of filters [$num]",
3) { _ =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key ->
false.toString,
SQLConf.PARQUET_RECORD_FILTER_ENABLED.key -> true.toString) {
// We should strip Spark-side filter to compare correctly.
stripSparkFilter(
spark.read.parquet(path.getAbsolutePath).filter(filterExpr)).count()
}
}
benchmark.addCase(s"Spark-side filter - number of filters [$num]", 3)
{ _ =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key ->
false.toString,
SQLConf.PARQUET_RECORD_FILTER_ENABLED.key -> false.toString) {
spark.read.parquet(path.getAbsolutePath).filter(filterExpr).count()
}
}
}
benchmark.run()
}
}
```
**Result**
```
Parquet-side vs Spark-side: Best/Avg Time(ms) Rate(M/s)
Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet-side filter - number of filters [5] 4268 / 4367 234.3
4.3 0.8X
Spark-side filter - number of filters [5] 3709 / 3741 269.6
3.7 0.9X
Parquet-side filter - number of filters [10] 5673 / 5727 176.3
5.7 0.6X
Spark-side filter - number of filters [10] 3588 / 3632 278.7
3.6 0.9X
Parquet-side filter - number of filters [20] 8024 / 8440 124.6
8.0 0.4X
Spark-side filter - number of filters [20] 3912 / 3946 255.6
3.9 0.8X
Parquet-side filter - number of filters [30] 11936 / 12041 83.8
11.9 0.3X
Spark-side filter - number of filters [30] 3929 / 3978 254.5
3.9 0.8X
```
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/HyukjinKwon/spark SPARK-17310
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/15049.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #15049
----
commit 7b2e27e5e6510679323def779cf0c2f99b195adc
Author: hyukjinkwon <[email protected]>
Date: 2016-09-11T04:34:21Z
Add an option to disable record-level filter in Parquet-side
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]