This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new e40fce9 [SPARK-34562][SQL] Add test and doc for Parquet Bloom filter push down e40fce9 is described below commit e40fce919ab77f5faeb0bbd34dc86c56c04adbaa Author: Yuming Wang <yumw...@ebay.com> AuthorDate: Mon Apr 12 17:07:35 2021 +0300 [SPARK-34562][SQL] Add test and doc for Parquet Bloom filter push down ### What changes were proposed in this pull request? This pr add test and document for Parquet Bloom filter push down. ### Why are the changes needed? Improve document. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Generating docs: ![image](https://user-images.githubusercontent.com/5399861/114327472-c131bb80-9b6b-11eb-87a0-6f9a74eb1097.png) Closes #32123 from wangyum/SPARK-34562. Authored-by: Yuming Wang <yumw...@ebay.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- docs/sql-data-sources-load-save-functions.md | 46 +++++++++++++++++++++- .../examples/sql/JavaSQLDataSourceExample.java | 8 ++++ examples/src/main/python/sql/datasource.py | 10 +++++ examples/src/main/r/RSparkSQLExample.R | 5 +++ .../spark/examples/sql/SQLDataSourceExample.scala | 8 ++++ .../datasources/parquet/ParquetFilterSuite.scala | 29 ++++++++++++++ 6 files changed, 104 insertions(+), 2 deletions(-) diff --git a/docs/sql-data-sources-load-save-functions.md b/docs/sql-data-sources-load-save-functions.md index 0866f37..25df34e 100644 --- a/docs/sql-data-sources-load-save-functions.md +++ b/docs/sql-data-sources-load-save-functions.md @@ -105,9 +105,11 @@ To load a CSV file you can use: The extra options are also used during write operation. For example, you can control bloom filters and dictionary encodings for ORC data sources. The following ORC example will create bloom filter and use dictionary encoding only for `favorite_color`. -For Parquet, there exists `parquet.enable.dictionary`, too. +For Parquet, there exists `parquet.bloom.filter.enabled` and `parquet.enable.dictionary`, too. To find more detailed information about the extra ORC/Parquet options, -visit the official Apache ORC/Parquet websites. +visit the official Apache [ORC](https://orc.apache.org/docs/spark-config.html) / [Parquet](https://github.com/apache/parquet-mr/tree/master/parquet-hadoop) websites. + +ORC data source: <div class="codetabs"> @@ -146,6 +148,46 @@ OPTIONS ( </div> +Parquet data source: + +<div class="codetabs"> + +<div data-lang="scala" markdown="1"> +{% include_example manual_save_options_parquet scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %} +</div> + +<div data-lang="java" markdown="1"> +{% include_example manual_save_options_parquet java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %} +</div> + +<div data-lang="python" markdown="1"> +{% include_example manual_save_options_parquet python/sql/datasource.py %} +</div> + +<div data-lang="r" markdown="1"> +{% include_example manual_save_options_parquet r/RSparkSQLExample.R %} +</div> + +<div data-lang="SQL" markdown="1"> + +{% highlight sql %} +CREATE TABLE users_with_options ( + name STRING, + favorite_color STRING, + favorite_numbers array<integer> +) USING parquet +OPTIONS ( + `parquet.bloom.filter.enabled#favorite_color` true, + `parquet.bloom.filter.expected.ndv#favorite_color` 1000000, + parquet.enable.dictionary true, + parquet.page.write-checksum.enabled true +) +{% endhighlight %} + +</div> + +</div> + ### Run SQL on files directly Instead of using read API to load a file into DataFrame and query it, you can also query that diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java index 53eb8fd..5dcf321 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java @@ -195,6 +195,14 @@ public class JavaSQLDataSourceExample { .option("orc.column.encoding.direct", "name") .save("users_with_options.orc"); // $example off:manual_save_options_orc$ + // $example on:manual_save_options_parquet$ + usersDF.write().format("parquet") + .option("parquet.bloom.filter.enabled#favorite_color", "true") + .option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000") + .option("parquet.enable.dictionary", "true") + .option("parquet.page.write-checksum.enabled", "false") + .save("users_with_options.parquet"); + // $example off:manual_save_options_parquet$ // $example on:direct_sql$ Dataset<Row> sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`"); diff --git a/examples/src/main/python/sql/datasource.py b/examples/src/main/python/sql/datasource.py index f3ad65f..4d7aa04 100644 --- a/examples/src/main/python/sql/datasource.py +++ b/examples/src/main/python/sql/datasource.py @@ -126,6 +126,16 @@ def basic_datasource_example(spark): .save("users_with_options.orc")) # $example off:manual_save_options_orc$ + # $example on:manual_save_options_parquet$ + df = spark.read.parquet("examples/src/main/resources/users.parquet") + (df.write.format("parquet") + .option("parquet.bloom.filter.enabled#favorite_color", "true") + .option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000") + .option("parquet.enable.dictionary", "true") + .option("parquet.page.write-checksum.enabled", "false") + .save("users_with_options.parquet")) + # $example off:manual_save_options_parquet$ + # $example on:write_sorting_and_bucketing$ df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed") # $example off:write_sorting_and_bucketing$ diff --git a/examples/src/main/r/RSparkSQLExample.R b/examples/src/main/r/RSparkSQLExample.R index 86ad533..15118e1 100644 --- a/examples/src/main/r/RSparkSQLExample.R +++ b/examples/src/main/r/RSparkSQLExample.R @@ -157,6 +157,11 @@ df <- read.df("examples/src/main/resources/users.orc", "orc") write.orc(df, "users_with_options.orc", orc.bloom.filter.columns = "favorite_color", orc.dictionary.key.threshold = 1.0, orc.column.encoding.direct = "name") # $example off:manual_save_options_orc$ +# $example on:manual_save_options_parquet$ +df <- read.df("examples/src/main/resources/users.parquet", "parquet") +write.parquet(df, "users_with_options.parquet", parquet.bloom.filter.enabled#favorite_color = true, parquet.bloom.filter.expected.ndv#favorite_color = 1000000, parquet.enable.dictionary = true, parquet.page.write-checksum.enabled = false) +# $example off:manual_save_options_parquet$ + # $example on:direct_sql$ df <- sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`") # $example off:direct_sql$ diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala index 207961b..6bd2bd6 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala @@ -129,6 +129,14 @@ object SQLDataSourceExample { .option("orc.column.encoding.direct", "name") .save("users_with_options.orc") // $example off:manual_save_options_orc$ + // $example on:manual_save_options_parquet$ + usersDF.write.format("parquet") + .option("parquet.bloom.filter.enabled#favorite_color", "true") + .option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000") + .option("parquet.enable.dictionary", "true") + .option("parquet.page.write-checksum.enabled", "false") + .save("users_with_options.parquet") + // $example off:manual_save_options_parquet$ // $example on:direct_sql$ val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 329a3e4..94bda56 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -1634,6 +1634,35 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } } + + test("SPARK-34562: Bloom filter push down") { + withTempPath { dir => + val path = dir.getCanonicalPath + spark.range(100).selectExpr("id * 2 AS id") + .write + .option(ParquetOutputFormat.BLOOM_FILTER_ENABLED + "#id", true) + // Disable dictionary because the distinct values less than 40000. + .option(ParquetOutputFormat.ENABLE_DICTIONARY, false) + .parquet(path) + + Seq(true, false).foreach { bloomFilterEnabled => + withSQLConf(ParquetInputFormat.BLOOM_FILTERING_ENABLED -> bloomFilterEnabled.toString) { + val accu = new NumRowGroupsAcc + sparkContext.register(accu) + + val df = spark.read.parquet(path).filter("id = 19") + df.foreachPartition((it: Iterator[Row]) => it.foreach(_ => accu.add(0))) + if (bloomFilterEnabled) { + assert(accu.value === 0) + } else { + assert(accu.value > 0) + } + + AccumulatorContext.remove(accu.id) + } + } + } + } } @ExtendedSQLTest --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org