[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20265 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161981031 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala --- @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.File + +import scala.util.{Random, Try} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.functions.monotonically_increasing_id +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.{Benchmark, Utils} + + +/** + * Benchmark to measure read performance with Filter pushdown. + */ +object FilterPushdownBenchmark { + val conf = new SparkConf() + conf.set("orc.compression", "snappy") + conf.set("spark.sql.parquet.compression.codec", "snappy") + + private val spark = SparkSession.builder() +.master("local[1]") +.appName("FilterPushdownBenchmark") +.config(conf) +.getOrCreate() + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def withTempTable(tableNames: String*)(f: => Unit): Unit = { +try f finally tableNames.foreach(spark.catalog.dropTempView) + } + + def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { +val (keys, values) = pairs.unzip +val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption) +(keys, values).zipped.foreach(spark.conf.set) +try f finally { + keys.zip(currentValues).foreach { +case (key, Some(value)) => spark.conf.set(key, value) +case (key, None) => spark.conf.unset(key) + } +} + } + + private def prepareTable(dir: File, numRows: Int, width: Int): Unit = { +import spark.implicits._ +val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i") +val df = spark.range(numRows).map(_ => Random.nextLong).selectExpr(selectExpr: _*) + .withColumn("id", monotonically_increasing_id()) + +val dirORC = dir.getCanonicalPath + "/orc" +val dirParquet = dir.getCanonicalPath + "/parquet" + +df.write.mode("overwrite").orc(dirORC) +df.write.mode("overwrite").parquet(dirParquet) + +spark.read.orc(dirORC).createOrReplaceTempView("orcTable") +spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable") + } + + def filterPushDownBenchmark( + values: Int, + title: String, + whereExpr: String, + selectExpr: String = "*"): Unit = { +val benchmark = new Benchmark(title, values, minNumIters = 5) + +Seq(false, true).foreach { pushDownEnabled => + val name = s"Parquet Vectorized ${if (pushDownEnabled) s"(Pushdown)" else ""}" + benchmark.addCase(name) { _ => +withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> s"$pushDownEnabled") { + spark.sql(s"SELECT $selectExpr FROM parquetTable WHERE $whereExpr").collect() +} + } +} + +Seq(false, true).foreach { pushDownEnabled => + val name = s"Native ORC Vectorized ${if (pushDownEnabled) s"(Pushdown)" else ""}" + benchmark.addCase(name) { _ => +withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> s"$pushDownEnabled") { + spark.sql(s"SELECT $selectExpr FROM orcTable WHERE $whereExpr").collect() +} + } +} + +/* +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2 +Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz + +Select 0 row (id IS NULL): Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative + --- +Parquet Vectorized7906 / 79
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161980753 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala --- @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.File + +import scala.util.{Random, Try} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.functions.monotonically_increasing_id +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.{Benchmark, Utils} + + +/** + * Benchmark to measure read performance with Filter pushdown. + */ +object FilterPushdownBenchmark { + val conf = new SparkConf() + conf.set("orc.compression", "snappy") + conf.set("spark.sql.parquet.compression.codec", "snappy") + + private val spark = SparkSession.builder() +.master("local[1]") +.appName("FilterPushdownBenchmark") +.config(conf) +.getOrCreate() + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def withTempTable(tableNames: String*)(f: => Unit): Unit = { +try f finally tableNames.foreach(spark.catalog.dropTempView) + } + + def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { +val (keys, values) = pairs.unzip +val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption) +(keys, values).zipped.foreach(spark.conf.set) +try f finally { + keys.zip(currentValues).foreach { +case (key, Some(value)) => spark.conf.set(key, value) +case (key, None) => spark.conf.unset(key) + } +} + } + + private def prepareTable(dir: File, numRows: Int, width: Int): Unit = { +import spark.implicits._ +val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i") +val df = spark.range(numRows).map(_ => Random.nextLong).selectExpr(selectExpr: _*) + .withColumn("id", monotonically_increasing_id()) + +val dirORC = dir.getCanonicalPath + "/orc" +val dirParquet = dir.getCanonicalPath + "/parquet" + +df.write.mode("overwrite").orc(dirORC) +df.write.mode("overwrite").parquet(dirParquet) + +spark.read.orc(dirORC).createOrReplaceTempView("orcTable") +spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable") + } + + def filterPushDownBenchmark( + values: Int, + title: String, + whereExpr: String, + selectExpr: String = "*"): Unit = { +val benchmark = new Benchmark(title, values, minNumIters = 5) + +Seq(false, true).foreach { pushDownEnabled => + val name = s"Parquet Vectorized ${if (pushDownEnabled) s"(Pushdown)" else ""}" + benchmark.addCase(name) { _ => +withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> s"$pushDownEnabled") { + spark.sql(s"SELECT $selectExpr FROM parquetTable WHERE $whereExpr").collect() +} + } +} + +Seq(false, true).foreach { pushDownEnabled => + val name = s"Native ORC Vectorized ${if (pushDownEnabled) s"(Pushdown)" else ""}" + benchmark.addCase(name) { _ => +withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> s"$pushDownEnabled") { + spark.sql(s"SELECT $selectExpr FROM orcTable WHERE $whereExpr").collect() +} + } +} + +/* +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2 +Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz + +Select 0 row (id IS NULL): Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative + --- +Parquet Vectorized7906 / 7955
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161979518 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala --- @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.File + +import scala.util.{Random, Try} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.functions.monotonically_increasing_id +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.{Benchmark, Utils} + + +/** + * Benchmark to measure read performance with Filter pushdown. + */ +object FilterPushdownBenchmark { + val conf = new SparkConf() + conf.set("orc.compression", "snappy") + conf.set("spark.sql.parquet.compression.codec", "snappy") + + private val spark = SparkSession.builder() +.master("local[1]") +.appName("FilterPushdownBenchmark") +.config(conf) +.getOrCreate() + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def withTempTable(tableNames: String*)(f: => Unit): Unit = { +try f finally tableNames.foreach(spark.catalog.dropTempView) + } + + def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { +val (keys, values) = pairs.unzip +val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption) +(keys, values).zipped.foreach(spark.conf.set) +try f finally { + keys.zip(currentValues).foreach { +case (key, Some(value)) => spark.conf.set(key, value) +case (key, None) => spark.conf.unset(key) + } +} + } + + private def prepareTable(dir: File, numRows: Int, width: Int): Unit = { +import spark.implicits._ +val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i") +val df = spark.range(numRows).map(_ => Random.nextLong).selectExpr(selectExpr: _*) + .withColumn("id", monotonically_increasing_id()) + +val dirORC = dir.getCanonicalPath + "/orc" +val dirParquet = dir.getCanonicalPath + "/parquet" + +df.write.mode("overwrite").orc(dirORC) +df.write.mode("overwrite").parquet(dirParquet) + +spark.read.orc(dirORC).createOrReplaceTempView("orcTable") +spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable") + } + + def filterPushDownBenchmark( + values: Int, + title: String, + whereExpr: String, + selectExpr: String = "*"): Unit = { +val benchmark = new Benchmark(title, values, minNumIters = 5) + +Seq(false, true).foreach { pushDownEnabled => + val name = s"Parquet Vectorized ${if (pushDownEnabled) s"(Pushdown)" else ""}" + benchmark.addCase(name) { _ => +withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> s"$pushDownEnabled") { + spark.sql(s"SELECT $selectExpr FROM parquetTable WHERE $whereExpr").collect() +} + } +} + +Seq(false, true).foreach { pushDownEnabled => + val name = s"Native ORC Vectorized ${if (pushDownEnabled) s"(Pushdown)" else ""}" + benchmark.addCase(name) { _ => +withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> s"$pushDownEnabled") { + spark.sql(s"SELECT $selectExpr FROM orcTable WHERE $whereExpr").collect() +} + } +} + +/* +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2 +Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz + +Select 0 row (id IS NULL): Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative + --- +Parquet Vectorized7906 / 79
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161973792 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala --- @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.File + +import scala.util.{Random, Try} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.functions.monotonically_increasing_id +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.{Benchmark, Utils} + + +/** + * Benchmark to measure read performance with Filter pushdown. + */ +object FilterPushdownBenchmark { + val conf = new SparkConf() + conf.set("orc.compression", "snappy") + conf.set("spark.sql.parquet.compression.codec", "snappy") + + private val spark = SparkSession.builder() +.master("local[1]") +.appName("FilterPushdownBenchmark") +.config(conf) +.getOrCreate() + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def withTempTable(tableNames: String*)(f: => Unit): Unit = { +try f finally tableNames.foreach(spark.catalog.dropTempView) + } + + def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { +val (keys, values) = pairs.unzip +val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption) +(keys, values).zipped.foreach(spark.conf.set) +try f finally { + keys.zip(currentValues).foreach { +case (key, Some(value)) => spark.conf.set(key, value) +case (key, None) => spark.conf.unset(key) + } +} + } + + private def prepareTable(dir: File, numRows: Int, width: Int): Unit = { +import spark.implicits._ +val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i") +val df = spark.range(numRows).map(_ => Random.nextLong).selectExpr(selectExpr: _*) + .withColumn("id", monotonically_increasing_id()) + +val dirORC = dir.getCanonicalPath + "/orc" +val dirParquet = dir.getCanonicalPath + "/parquet" + +df.write.mode("overwrite").orc(dirORC) +df.write.mode("overwrite").parquet(dirParquet) + +spark.read.orc(dirORC).createOrReplaceTempView("orcTable") +spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable") + } + + def filterPushDownBenchmark(values: Int, title: String, expr: String): Unit = { +val benchmark = new Benchmark(title, values, minNumIters = 5) + +Seq(false, true).foreach { pushDownEnabled => + val name = s"Parquet Vectorized ${if (pushDownEnabled) s"(Pushdown)" else ""}" + benchmark.addCase(name) { _ => +withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> s"$pushDownEnabled") { + spark.sql(s"SELECT * FROM parquetTable WHERE $expr").collect() +} + } +} + +Seq(false, true).foreach { pushDownEnabled => + val name = s"Native ORC Vectorized ${if (pushDownEnabled) s"(Pushdown)" else ""}" + benchmark.addCase(name) { _ => +withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> s"$pushDownEnabled") { + spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect() +} + } +} + +/* +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2 +Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz + +Select 0 row (id IS NULL): Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative + --- +Parquet Vectorized2091 / 2258 0.5 1993.9 1.0X +Parquet Vectorized (Pushdown) 41 / 4
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161973650 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala --- @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.File + +import scala.util.{Random, Try} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.functions.monotonically_increasing_id +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.{Benchmark, Utils} + + +/** + * Benchmark to measure read performance with Filter pushdown. + */ +object FilterPushdownBenchmark { + val conf = new SparkConf() + conf.set("orc.compression", "snappy") + conf.set("spark.sql.parquet.compression.codec", "snappy") + + private val spark = SparkSession.builder() +.master("local[1]") +.appName("FilterPushdownBenchmark") +.config(conf) +.getOrCreate() + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def withTempTable(tableNames: String*)(f: => Unit): Unit = { +try f finally tableNames.foreach(spark.catalog.dropTempView) + } + + def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { +val (keys, values) = pairs.unzip +val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption) +(keys, values).zipped.foreach(spark.conf.set) +try f finally { + keys.zip(currentValues).foreach { +case (key, Some(value)) => spark.conf.set(key, value) +case (key, None) => spark.conf.unset(key) + } +} + } + + private def prepareTable(dir: File, numRows: Int, width: Int): Unit = { +import spark.implicits._ +val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i") +val df = spark.range(numRows).map(_ => Random.nextLong).selectExpr(selectExpr: _*) + .withColumn("id", monotonically_increasing_id()) + +val dirORC = dir.getCanonicalPath + "/orc" +val dirParquet = dir.getCanonicalPath + "/parquet" + +df.write.mode("overwrite").orc(dirORC) +df.write.mode("overwrite").parquet(dirParquet) + +spark.read.orc(dirORC).createOrReplaceTempView("orcTable") +spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable") + } + + def filterPushDownBenchmark(values: Int, title: String, expr: String): Unit = { +val benchmark = new Benchmark(title, values, minNumIters = 5) + +Seq(false, true).foreach { pushDownEnabled => + val name = s"Parquet Vectorized ${if (pushDownEnabled) s"(Pushdown)" else ""}" + benchmark.addCase(name) { _ => +withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> s"$pushDownEnabled") { + spark.sql(s"SELECT * FROM parquetTable WHERE $expr").collect() +} + } +} + +Seq(false, true).foreach { pushDownEnabled => + val name = s"Native ORC Vectorized ${if (pushDownEnabled) s"(Pushdown)" else ""}" + benchmark.addCase(name) { _ => +withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> s"$pushDownEnabled") { + spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect() +} + } +} + +/* +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2 +Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz + +Select 0 row (id IS NULL): Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative + --- +Parquet Vectorized2091 / 2258 0.5 1993.9 1.0X +Parquet Vectorized (Pushdown) 41 / 4
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161964178 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala --- @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.File + +import scala.util.{Random, Try} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.functions.monotonically_increasing_id +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.{Benchmark, Utils} + + +/** + * Benchmark to measure read performance with Filter pushdown. + */ +object FilterPushdownBenchmark { + val conf = new SparkConf() + conf.set("orc.compression", "snappy") + conf.set("spark.sql.parquet.compression.codec", "snappy") + + private val spark = SparkSession.builder() +.master("local[1]") +.appName("FilterPushdownBenchmark") +.config(conf) +.getOrCreate() + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def withTempTable(tableNames: String*)(f: => Unit): Unit = { +try f finally tableNames.foreach(spark.catalog.dropTempView) + } + + def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { +val (keys, values) = pairs.unzip +val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption) +(keys, values).zipped.foreach(spark.conf.set) +try f finally { + keys.zip(currentValues).foreach { +case (key, Some(value)) => spark.conf.set(key, value) +case (key, None) => spark.conf.unset(key) + } +} + } + + private def prepareTable(dir: File, numRows: Int, width: Int): Unit = { +import spark.implicits._ +val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i") +val df = spark.range(numRows).map(_ => Random.nextLong).selectExpr(selectExpr: _*) + .withColumn("id", monotonically_increasing_id()) + +val dirORC = dir.getCanonicalPath + "/orc" +val dirParquet = dir.getCanonicalPath + "/parquet" + +df.write.mode("overwrite").orc(dirORC) +df.write.mode("overwrite").parquet(dirParquet) + +spark.read.orc(dirORC).createOrReplaceTempView("orcTable") +spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable") + } + + def filterPushDownBenchmark(values: Int, title: String, expr: String): Unit = { +val benchmark = new Benchmark(title, values, minNumIters = 5) + +Seq(false, true).foreach { pushDownEnabled => + val name = s"Parquet Vectorized ${if (pushDownEnabled) s"(Pushdown)" else ""}" + benchmark.addCase(name) { _ => +withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> s"$pushDownEnabled") { + spark.sql(s"SELECT * FROM parquetTable WHERE $expr").collect() +} + } +} + +Seq(false, true).foreach { pushDownEnabled => + val name = s"Native ORC Vectorized ${if (pushDownEnabled) s"(Pushdown)" else ""}" + benchmark.addCase(name) { _ => +withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> s"$pushDownEnabled") { + spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect() +} + } +} + +/* +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2 +Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz + +Select 0 row (id IS NULL): Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative + --- +Parquet Vectorized2091 / 2258 0.5 1993.9 1.0X +Parquet Vectorized (Pushdown) 41 / 44
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161963925 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala --- @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.File + +import scala.util.{Random, Try} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.functions.monotonically_increasing_id +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.{Benchmark, Utils} + + +/** + * Benchmark to measure read performance with Filter pushdown. + */ +object FilterPushdownBenchmark { + val conf = new SparkConf() + conf.set("orc.compression", "snappy") + conf.set("spark.sql.parquet.compression.codec", "snappy") + + private val spark = SparkSession.builder() +.master("local[1]") +.appName("FilterPushdownBenchmark") +.config(conf) +.getOrCreate() + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def withTempTable(tableNames: String*)(f: => Unit): Unit = { +try f finally tableNames.foreach(spark.catalog.dropTempView) + } + + def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { +val (keys, values) = pairs.unzip +val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption) +(keys, values).zipped.foreach(spark.conf.set) +try f finally { + keys.zip(currentValues).foreach { +case (key, Some(value)) => spark.conf.set(key, value) +case (key, None) => spark.conf.unset(key) + } +} + } + + private def prepareTable(dir: File, numRows: Int, width: Int): Unit = { +import spark.implicits._ +val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i") +val df = spark.range(numRows).map(_ => Random.nextLong).selectExpr(selectExpr: _*) + .withColumn("id", monotonically_increasing_id()) + +val dirORC = dir.getCanonicalPath + "/orc" +val dirParquet = dir.getCanonicalPath + "/parquet" + +df.write.mode("overwrite").orc(dirORC) +df.write.mode("overwrite").parquet(dirParquet) + +spark.read.orc(dirORC).createOrReplaceTempView("orcTable") +spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable") + } + + def filterPushDownBenchmark(values: Int, title: String, expr: String): Unit = { +val benchmark = new Benchmark(title, values, minNumIters = 5) + +Seq(false, true).foreach { pushDownEnabled => + val name = s"Parquet Vectorized ${if (pushDownEnabled) s"(Pushdown)" else ""}" + benchmark.addCase(name) { _ => +withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> s"$pushDownEnabled") { + spark.sql(s"SELECT * FROM parquetTable WHERE $expr").collect() +} + } +} + +Seq(false, true).foreach { pushDownEnabled => + val name = s"Native ORC Vectorized ${if (pushDownEnabled) s"(Pushdown)" else ""}" + benchmark.addCase(name) { _ => +withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> s"$pushDownEnabled") { + spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect() +} + } +} + +/* +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2 +Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz + +Select 0 row (id IS NULL): Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative + --- +Parquet Vectorized2091 / 2258 0.5 1993.9 1.0X +Parquet Vectorized (Pushdown) 41 / 44
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161963288 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala --- @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.File + +import scala.util.{Random, Try} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.functions.monotonically_increasing_id +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.{Benchmark, Utils} + + +/** + * Benchmark to measure read performance with Filter pushdown. + */ +object FilterPushdownBenchmark { + val conf = new SparkConf() + conf.set("orc.compression", "snappy") + conf.set("spark.sql.parquet.compression.codec", "snappy") + + private val spark = SparkSession.builder() +.master("local[1]") +.appName("FilterPushdownBenchmark") +.config(conf) +.getOrCreate() + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def withTempTable(tableNames: String*)(f: => Unit): Unit = { +try f finally tableNames.foreach(spark.catalog.dropTempView) + } + + def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { +val (keys, values) = pairs.unzip +val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption) +(keys, values).zipped.foreach(spark.conf.set) +try f finally { + keys.zip(currentValues).foreach { +case (key, Some(value)) => spark.conf.set(key, value) +case (key, None) => spark.conf.unset(key) + } +} + } + + private def prepareTable(dir: File, numRows: Int, width: Int): Unit = { +import spark.implicits._ +val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i") +val df = spark.range(numRows).map(_ => Random.nextLong).selectExpr(selectExpr: _*) + .withColumn("id", monotonically_increasing_id()) + +val dirORC = dir.getCanonicalPath + "/orc" +val dirParquet = dir.getCanonicalPath + "/parquet" + +df.write.mode("overwrite").orc(dirORC) +df.write.mode("overwrite").parquet(dirParquet) + +spark.read.orc(dirORC).createOrReplaceTempView("orcTable") +spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable") + } + + def filterPushDownBenchmark(values: Int, title: String, expr: String): Unit = { +val benchmark = new Benchmark(title, values, minNumIters = 5) + +Seq(false, true).foreach { pushDownEnabled => + val name = s"Parquet Vectorized ${if (pushDownEnabled) s"(Pushdown)" else ""}" + benchmark.addCase(name) { _ => +withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> s"$pushDownEnabled") { + spark.sql(s"SELECT * FROM parquetTable WHERE $expr").collect() +} + } +} + +Seq(false, true).foreach { pushDownEnabled => + val name = s"Native ORC Vectorized ${if (pushDownEnabled) s"(Pushdown)" else ""}" + benchmark.addCase(name) { _ => +withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> s"$pushDownEnabled") { + spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect() +} + } +} + +/* +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2 +Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz + +Select 0 row (id IS NULL): Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative + --- +Parquet Vectorized2091 / 2258 0.5 1993.9 1.0X +Parquet Vectorized (Pushdown) 41 / 44
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161671821 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.File + +import scala.util.{Random, Try} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.functions.monotonically_increasing_id +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.{Benchmark, Utils} + + +/** + * Benchmark to measure read performance with Filter pushdown. + */ +// scalastyle:off line.size.limit +object FilterPushdownBenchmark { + val conf = new SparkConf() + conf.set("orc.compression", "snappy") + conf.set("spark.sql.parquet.compression.codec", "snappy") + + private val spark = SparkSession.builder() +.master("local[1]") +.appName("FilterPushdownBenchmark") +.config(conf) +.getOrCreate() + + // Set default configs. Individual cases will change them if necessary. + spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true") + spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true") + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def withTempTable(tableNames: String*)(f: => Unit): Unit = { +try f finally tableNames.foreach(spark.catalog.dropTempView) + } + + def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { +val (keys, values) = pairs.unzip +val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption) +(keys, values).zipped.foreach(spark.conf.set) +try f finally { + keys.zip(currentValues).foreach { +case (key, Some(value)) => spark.conf.set(key, value) +case (key, None) => spark.conf.unset(key) + } +} + } + + private def prepareTable(dir: File, df: DataFrame): Unit = { +val dirORC = dir.getCanonicalPath + "/orc" +val dirParquet = dir.getCanonicalPath + "/parquet" + +df.write.mode("overwrite").orc(dirORC) +df.write.mode("overwrite").parquet(dirParquet) + +spark.read.orc(dirORC).createOrReplaceTempView("orcTable") +spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable") + } + + def filterPushDownBenchmark(values: Int, width: Int, expr: String): Unit = { +val benchmark = new Benchmark(s"Filter Pushdown ($expr)", values) + +withTempPath { dir => + withTempTable("t1", "orcTable", "patquetTable") { +import spark.implicits._ +val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i") +val df = spark.range(values).map(_ => Random.nextLong).selectExpr(selectExpr: _*) + .withColumn("id", monotonically_increasing_id()) + +df.createOrReplaceTempView("t1") +prepareTable(dir, spark.sql("SELECT * FROM t1")) + +Seq(false, true).foreach { value => + benchmark.addCase(s"Parquet Vectorized ${if (value) s"(Pushdown)" else ""}") { _ => +withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> s"$value") { + spark.sql(s"SELECT * FROM parquetTable WHERE $expr").collect() +} + } +} + +Seq(false, true).foreach { value => + benchmark.addCase(s"Native ORC Vectorized ${if (value) s"(Pushdown)" else ""}") { _ => +withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> s"$value") { + spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect() +} + } +} + +// Positive cases: Select one or no rows +/* +Java HotSpot(TM) 64-B
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161672411 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.File + +import scala.util.{Random, Try} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.functions.monotonically_increasing_id +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.{Benchmark, Utils} + + +/** + * Benchmark to measure read performance with Filter pushdown. + */ +// scalastyle:off line.size.limit +object FilterPushdownBenchmark { + val conf = new SparkConf() + conf.set("orc.compression", "snappy") + conf.set("spark.sql.parquet.compression.codec", "snappy") + + private val spark = SparkSession.builder() +.master("local[1]") +.appName("FilterPushdownBenchmark") +.config(conf) +.getOrCreate() + + // Set default configs. Individual cases will change them if necessary. + spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true") + spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true") + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def withTempTable(tableNames: String*)(f: => Unit): Unit = { +try f finally tableNames.foreach(spark.catalog.dropTempView) + } + + def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { +val (keys, values) = pairs.unzip +val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption) +(keys, values).zipped.foreach(spark.conf.set) +try f finally { + keys.zip(currentValues).foreach { +case (key, Some(value)) => spark.conf.set(key, value) +case (key, None) => spark.conf.unset(key) + } +} + } + + private def prepareTable(dir: File, df: DataFrame): Unit = { +val dirORC = dir.getCanonicalPath + "/orc" +val dirParquet = dir.getCanonicalPath + "/parquet" + +df.write.mode("overwrite").orc(dirORC) +df.write.mode("overwrite").parquet(dirParquet) + +spark.read.orc(dirORC).createOrReplaceTempView("orcTable") +spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable") + } + + def filterPushDownBenchmark(values: Int, width: Int, expr: String): Unit = { +val benchmark = new Benchmark(s"Filter Pushdown ($expr)", values) + +withTempPath { dir => + withTempTable("t1", "orcTable", "patquetTable") { +import spark.implicits._ +val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i") +val df = spark.range(values).map(_ => Random.nextLong).selectExpr(selectExpr: _*) + .withColumn("id", monotonically_increasing_id()) + +df.createOrReplaceTempView("t1") +prepareTable(dir, spark.sql("SELECT * FROM t1")) + +Seq(false, true).foreach { value => + benchmark.addCase(s"Parquet Vectorized ${if (value) s"(Pushdown)" else ""}") { _ => +withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> s"$value") { + spark.sql(s"SELECT * FROM parquetTable WHERE $expr").collect() +} + } +} + +Seq(false, true).foreach { value => + benchmark.addCase(s"Native ORC Vectorized ${if (value) s"(Pushdown)" else ""}") { _ => +withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> s"$value") { + spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect() +} + } +} + +// Positive cases: Select one or no rows +/* +Java HotSpot(TM) 64-B
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161672316 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.File + +import scala.util.{Random, Try} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.functions.monotonically_increasing_id +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.{Benchmark, Utils} + + +/** + * Benchmark to measure read performance with Filter pushdown. + */ +// scalastyle:off line.size.limit +object FilterPushdownBenchmark { + val conf = new SparkConf() + conf.set("orc.compression", "snappy") + conf.set("spark.sql.parquet.compression.codec", "snappy") + + private val spark = SparkSession.builder() +.master("local[1]") +.appName("FilterPushdownBenchmark") +.config(conf) +.getOrCreate() + + // Set default configs. Individual cases will change them if necessary. + spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true") + spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true") + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def withTempTable(tableNames: String*)(f: => Unit): Unit = { +try f finally tableNames.foreach(spark.catalog.dropTempView) + } + + def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { +val (keys, values) = pairs.unzip +val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption) +(keys, values).zipped.foreach(spark.conf.set) +try f finally { + keys.zip(currentValues).foreach { +case (key, Some(value)) => spark.conf.set(key, value) +case (key, None) => spark.conf.unset(key) + } +} + } + + private def prepareTable(dir: File, df: DataFrame): Unit = { +val dirORC = dir.getCanonicalPath + "/orc" +val dirParquet = dir.getCanonicalPath + "/parquet" + +df.write.mode("overwrite").orc(dirORC) +df.write.mode("overwrite").parquet(dirParquet) + +spark.read.orc(dirORC).createOrReplaceTempView("orcTable") +spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable") + } + + def filterPushDownBenchmark(values: Int, width: Int, expr: String): Unit = { +val benchmark = new Benchmark(s"Filter Pushdown ($expr)", values) + +withTempPath { dir => + withTempTable("t1", "orcTable", "patquetTable") { +import spark.implicits._ +val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i") +val df = spark.range(values).map(_ => Random.nextLong).selectExpr(selectExpr: _*) + .withColumn("id", monotonically_increasing_id()) + +df.createOrReplaceTempView("t1") +prepareTable(dir, spark.sql("SELECT * FROM t1")) + +Seq(false, true).foreach { value => + benchmark.addCase(s"Parquet Vectorized ${if (value) s"(Pushdown)" else ""}") { _ => +withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> s"$value") { + spark.sql(s"SELECT * FROM parquetTable WHERE $expr").collect() +} + } +} + +Seq(false, true).foreach { value => + benchmark.addCase(s"Native ORC Vectorized ${if (value) s"(Pushdown)" else ""}") { _ => +withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> s"$value") { + spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect() +} + } +} + +// Positive cases: Select one or no rows +/* +Java HotSpot(TM) 64-B
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161671868 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.File + +import scala.util.{Random, Try} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.functions.monotonically_increasing_id +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.{Benchmark, Utils} + + +/** + * Benchmark to measure read performance with Filter pushdown. + */ +// scalastyle:off line.size.limit +object FilterPushdownBenchmark { + val conf = new SparkConf() + conf.set("orc.compression", "snappy") + conf.set("spark.sql.parquet.compression.codec", "snappy") + + private val spark = SparkSession.builder() +.master("local[1]") +.appName("FilterPushdownBenchmark") +.config(conf) +.getOrCreate() + + // Set default configs. Individual cases will change them if necessary. + spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true") + spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true") + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def withTempTable(tableNames: String*)(f: => Unit): Unit = { +try f finally tableNames.foreach(spark.catalog.dropTempView) + } + + def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { +val (keys, values) = pairs.unzip +val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption) +(keys, values).zipped.foreach(spark.conf.set) +try f finally { + keys.zip(currentValues).foreach { +case (key, Some(value)) => spark.conf.set(key, value) +case (key, None) => spark.conf.unset(key) + } +} + } + + private def prepareTable(dir: File, df: DataFrame): Unit = { +val dirORC = dir.getCanonicalPath + "/orc" +val dirParquet = dir.getCanonicalPath + "/parquet" + +df.write.mode("overwrite").orc(dirORC) +df.write.mode("overwrite").parquet(dirParquet) + +spark.read.orc(dirORC).createOrReplaceTempView("orcTable") +spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable") + } + + def filterPushDownBenchmark(values: Int, width: Int, expr: String): Unit = { +val benchmark = new Benchmark(s"Filter Pushdown ($expr)", values) + +withTempPath { dir => + withTempTable("t1", "orcTable", "patquetTable") { +import spark.implicits._ +val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i") +val df = spark.range(values).map(_ => Random.nextLong).selectExpr(selectExpr: _*) + .withColumn("id", monotonically_increasing_id()) + +df.createOrReplaceTempView("t1") +prepareTable(dir, spark.sql("SELECT * FROM t1")) + +Seq(false, true).foreach { value => --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161671835 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.File + +import scala.util.{Random, Try} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.functions.monotonically_increasing_id +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.{Benchmark, Utils} + + +/** + * Benchmark to measure read performance with Filter pushdown. + */ +// scalastyle:off line.size.limit +object FilterPushdownBenchmark { + val conf = new SparkConf() + conf.set("orc.compression", "snappy") + conf.set("spark.sql.parquet.compression.codec", "snappy") + + private val spark = SparkSession.builder() +.master("local[1]") +.appName("FilterPushdownBenchmark") +.config(conf) +.getOrCreate() + + // Set default configs. Individual cases will change them if necessary. + spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true") + spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true") + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def withTempTable(tableNames: String*)(f: => Unit): Unit = { +try f finally tableNames.foreach(spark.catalog.dropTempView) + } + + def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { +val (keys, values) = pairs.unzip +val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption) +(keys, values).zipped.foreach(spark.conf.set) +try f finally { + keys.zip(currentValues).foreach { +case (key, Some(value)) => spark.conf.set(key, value) +case (key, None) => spark.conf.unset(key) + } +} + } + + private def prepareTable(dir: File, df: DataFrame): Unit = { +val dirORC = dir.getCanonicalPath + "/orc" +val dirParquet = dir.getCanonicalPath + "/parquet" + +df.write.mode("overwrite").orc(dirORC) +df.write.mode("overwrite").parquet(dirParquet) + +spark.read.orc(dirORC).createOrReplaceTempView("orcTable") +spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable") + } + + def filterPushDownBenchmark(values: Int, width: Int, expr: String): Unit = { +val benchmark = new Benchmark(s"Filter Pushdown ($expr)", values) + +withTempPath { dir => + withTempTable("t1", "orcTable", "patquetTable") { +import spark.implicits._ +val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i") +val df = spark.range(values).map(_ => Random.nextLong).selectExpr(selectExpr: _*) + .withColumn("id", monotonically_increasing_id()) + +df.createOrReplaceTempView("t1") +prepareTable(dir, spark.sql("SELECT * FROM t1")) + +Seq(false, true).foreach { value => + benchmark.addCase(s"Parquet Vectorized ${if (value) s"(Pushdown)" else ""}") { _ => +withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> s"$value") { + spark.sql(s"SELECT * FROM parquetTable WHERE $expr").collect() +} + } +} + +Seq(false, true).foreach { value => + benchmark.addCase(s"Native ORC Vectorized ${if (value) s"(Pushdown)" else ""}") { _ => +withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> s"$value") { + spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect() +} + } +} + +// Positive cases: Select one or no rows +/* +Java HotSpot(TM) 64-B
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161652710 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.File + +import scala.util.{Random, Try} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.functions.monotonically_increasing_id +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.{Benchmark, Utils} + + +/** + * Benchmark to measure read performance with Filter pushdown. + */ +// scalastyle:off line.size.limit +object FilterPushdownBenchmark { + val conf = new SparkConf() + conf.set("orc.compression", "snappy") + conf.set("spark.sql.parquet.compression.codec", "snappy") + + private val spark = SparkSession.builder() +.master("local[1]") +.appName("FilterPushdownBenchmark") +.config(conf) +.getOrCreate() + + // Set default configs. Individual cases will change them if necessary. + spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true") + spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true") + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def withTempTable(tableNames: String*)(f: => Unit): Unit = { +try f finally tableNames.foreach(spark.catalog.dropTempView) + } + + def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { +val (keys, values) = pairs.unzip +val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption) +(keys, values).zipped.foreach(spark.conf.set) +try f finally { + keys.zip(currentValues).foreach { +case (key, Some(value)) => spark.conf.set(key, value) +case (key, None) => spark.conf.unset(key) + } +} + } + + private def prepareTable(dir: File, df: DataFrame): Unit = { +val dirORC = dir.getCanonicalPath + "/orc" +val dirParquet = dir.getCanonicalPath + "/parquet" + +df.write.mode("overwrite").orc(dirORC) +df.write.mode("overwrite").parquet(dirParquet) + +spark.read.orc(dirORC).createOrReplaceTempView("orcTable") +spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable") + } + + def filterPushDownBenchmark(values: Int, width: Int, expr: String): Unit = { +val benchmark = new Benchmark(s"Filter Pushdown ($expr)", values) + +withTempPath { dir => + withTempTable("t1", "orcTable", "patquetTable") { +import spark.implicits._ +val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i") +val df = spark.range(values).map(_ => Random.nextLong).selectExpr(selectExpr: _*) + .withColumn("id", monotonically_increasing_id()) + +df.createOrReplaceTempView("t1") +prepareTable(dir, spark.sql("SELECT * FROM t1")) + +Seq(false, true).foreach { value => + benchmark.addCase(s"Parquet Vectorized ${if (value) s"(Pushdown)" else ""}") { _ => +withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> s"$value") { + spark.sql(s"SELECT * FROM parquetTable WHERE $expr").collect() +} + } +} + +Seq(false, true).foreach { value => + benchmark.addCase(s"Native ORC Vectorized ${if (value) s"(Pushdown)" else ""}") { _ => +withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> s"$value") { + spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect() +} + } +} + +// Positive cases: Select one or no rows +/* +Java HotSpot(TM) 64-Bit S
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161652575 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.File + +import scala.util.{Random, Try} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.functions.monotonically_increasing_id +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.{Benchmark, Utils} + + +/** + * Benchmark to measure read performance with Filter pushdown. + */ +// scalastyle:off line.size.limit +object FilterPushdownBenchmark { + val conf = new SparkConf() + conf.set("orc.compression", "snappy") + conf.set("spark.sql.parquet.compression.codec", "snappy") + + private val spark = SparkSession.builder() +.master("local[1]") +.appName("FilterPushdownBenchmark") +.config(conf) +.getOrCreate() + + // Set default configs. Individual cases will change them if necessary. + spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true") + spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true") + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def withTempTable(tableNames: String*)(f: => Unit): Unit = { +try f finally tableNames.foreach(spark.catalog.dropTempView) + } + + def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { +val (keys, values) = pairs.unzip +val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption) +(keys, values).zipped.foreach(spark.conf.set) +try f finally { + keys.zip(currentValues).foreach { +case (key, Some(value)) => spark.conf.set(key, value) +case (key, None) => spark.conf.unset(key) + } +} + } + + private def prepareTable(dir: File, df: DataFrame): Unit = { +val dirORC = dir.getCanonicalPath + "/orc" +val dirParquet = dir.getCanonicalPath + "/parquet" + +df.write.mode("overwrite").orc(dirORC) +df.write.mode("overwrite").parquet(dirParquet) + +spark.read.orc(dirORC).createOrReplaceTempView("orcTable") +spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable") + } + + def filterPushDownBenchmark(values: Int, width: Int, expr: String): Unit = { +val benchmark = new Benchmark(s"Filter Pushdown ($expr)", values) + +withTempPath { dir => + withTempTable("t1", "orcTable", "patquetTable") { +import spark.implicits._ +val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i") +val df = spark.range(values).map(_ => Random.nextLong).selectExpr(selectExpr: _*) + .withColumn("id", monotonically_increasing_id()) + +df.createOrReplaceTempView("t1") +prepareTable(dir, spark.sql("SELECT * FROM t1")) + +Seq(false, true).foreach { value => + benchmark.addCase(s"Parquet Vectorized ${if (value) s"(Pushdown)" else ""}") { _ => +withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> s"$value") { + spark.sql(s"SELECT * FROM parquetTable WHERE $expr").collect() +} + } +} + +Seq(false, true).foreach { value => + benchmark.addCase(s"Native ORC Vectorized ${if (value) s"(Pushdown)" else ""}") { _ => +withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> s"$value") { + spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect() +} + } +} + +// Positive cases: Select one or no rows +/* +Java HotSpot(TM) 64-Bit S
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161652489 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.File + +import scala.util.{Random, Try} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.functions.monotonically_increasing_id +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.{Benchmark, Utils} + + +/** + * Benchmark to measure read performance with Filter pushdown. + */ +// scalastyle:off line.size.limit +object FilterPushdownBenchmark { + val conf = new SparkConf() + conf.set("orc.compression", "snappy") + conf.set("spark.sql.parquet.compression.codec", "snappy") + + private val spark = SparkSession.builder() +.master("local[1]") +.appName("FilterPushdownBenchmark") +.config(conf) +.getOrCreate() + + // Set default configs. Individual cases will change them if necessary. + spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true") + spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true") + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def withTempTable(tableNames: String*)(f: => Unit): Unit = { +try f finally tableNames.foreach(spark.catalog.dropTempView) + } + + def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { +val (keys, values) = pairs.unzip +val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption) +(keys, values).zipped.foreach(spark.conf.set) +try f finally { + keys.zip(currentValues).foreach { +case (key, Some(value)) => spark.conf.set(key, value) +case (key, None) => spark.conf.unset(key) + } +} + } + + private def prepareTable(dir: File, df: DataFrame): Unit = { +val dirORC = dir.getCanonicalPath + "/orc" +val dirParquet = dir.getCanonicalPath + "/parquet" + +df.write.mode("overwrite").orc(dirORC) +df.write.mode("overwrite").parquet(dirParquet) + +spark.read.orc(dirORC).createOrReplaceTempView("orcTable") +spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable") + } + + def filterPushDownBenchmark(values: Int, width: Int, expr: String): Unit = { +val benchmark = new Benchmark(s"Filter Pushdown ($expr)", values) + +withTempPath { dir => + withTempTable("t1", "orcTable", "patquetTable") { +import spark.implicits._ +val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i") +val df = spark.range(values).map(_ => Random.nextLong).selectExpr(selectExpr: _*) + .withColumn("id", monotonically_increasing_id()) + +df.createOrReplaceTempView("t1") +prepareTable(dir, spark.sql("SELECT * FROM t1")) + +Seq(false, true).foreach { value => + benchmark.addCase(s"Parquet Vectorized ${if (value) s"(Pushdown)" else ""}") { _ => +withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> s"$value") { + spark.sql(s"SELECT * FROM parquetTable WHERE $expr").collect() +} + } +} + +Seq(false, true).foreach { value => + benchmark.addCase(s"Native ORC Vectorized ${if (value) s"(Pushdown)" else ""}") { _ => +withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> s"$value") { + spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect() +} + } +} + +// Positive cases: Select one or no rows +/* +Java HotSpot(TM) 64-Bit S
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161652403 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.File + +import scala.util.{Random, Try} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.functions.monotonically_increasing_id +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.{Benchmark, Utils} + + +/** + * Benchmark to measure read performance with Filter pushdown. + */ +// scalastyle:off line.size.limit +object FilterPushdownBenchmark { + val conf = new SparkConf() + conf.set("orc.compression", "snappy") + conf.set("spark.sql.parquet.compression.codec", "snappy") + + private val spark = SparkSession.builder() +.master("local[1]") +.appName("FilterPushdownBenchmark") +.config(conf) +.getOrCreate() + + // Set default configs. Individual cases will change them if necessary. + spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true") + spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true") + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def withTempTable(tableNames: String*)(f: => Unit): Unit = { +try f finally tableNames.foreach(spark.catalog.dropTempView) + } + + def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { +val (keys, values) = pairs.unzip +val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption) +(keys, values).zipped.foreach(spark.conf.set) +try f finally { + keys.zip(currentValues).foreach { +case (key, Some(value)) => spark.conf.set(key, value) +case (key, None) => spark.conf.unset(key) + } +} + } + + private def prepareTable(dir: File, df: DataFrame): Unit = { +val dirORC = dir.getCanonicalPath + "/orc" +val dirParquet = dir.getCanonicalPath + "/parquet" + +df.write.mode("overwrite").orc(dirORC) +df.write.mode("overwrite").parquet(dirParquet) + +spark.read.orc(dirORC).createOrReplaceTempView("orcTable") +spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable") + } + + def filterPushDownBenchmark(values: Int, width: Int, expr: String): Unit = { +val benchmark = new Benchmark(s"Filter Pushdown ($expr)", values) + +withTempPath { dir => + withTempTable("t1", "orcTable", "patquetTable") { +import spark.implicits._ +val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i") +val df = spark.range(values).map(_ => Random.nextLong).selectExpr(selectExpr: _*) + .withColumn("id", monotonically_increasing_id()) + +df.createOrReplaceTempView("t1") +prepareTable(dir, spark.sql("SELECT * FROM t1")) + +Seq(false, true).foreach { value => + benchmark.addCase(s"Parquet Vectorized ${if (value) s"(Pushdown)" else ""}") { _ => +withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> s"$value") { + spark.sql(s"SELECT * FROM parquetTable WHERE $expr").collect() +} + } +} + +Seq(false, true).foreach { value => + benchmark.addCase(s"Native ORC Vectorized ${if (value) s"(Pushdown)" else ""}") { _ => +withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> s"$value") { + spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect() +} + } +} + +// Positive cases: Select one or no rows +/* +Java HotSpot(TM) 64-Bit S
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161652361 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.File + +import scala.util.{Random, Try} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.functions.monotonically_increasing_id +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.{Benchmark, Utils} + + +/** + * Benchmark to measure read performance with Filter pushdown. + */ +// scalastyle:off line.size.limit +object FilterPushdownBenchmark { + val conf = new SparkConf() + conf.set("orc.compression", "snappy") + conf.set("spark.sql.parquet.compression.codec", "snappy") + + private val spark = SparkSession.builder() +.master("local[1]") +.appName("FilterPushdownBenchmark") +.config(conf) +.getOrCreate() + + // Set default configs. Individual cases will change them if necessary. + spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true") + spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true") --- End diff -- ah we don't need it, we always set them in the benchmark cases. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161652184 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.File + +import scala.util.{Random, Try} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.functions.monotonically_increasing_id +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.{Benchmark, Utils} + + +/** + * Benchmark to measure read performance with Filter pushdown. + */ +// scalastyle:off line.size.limit +object FilterPushdownBenchmark { + val conf = new SparkConf() + conf.set("orc.compression", "snappy") + conf.set("spark.sql.parquet.compression.codec", "snappy") + + private val spark = SparkSession.builder() +.master("local[1]") +.appName("FilterPushdownBenchmark") +.config(conf) +.getOrCreate() + + // Set default configs. Individual cases will change them if necessary. + spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true") + spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true") --- End diff -- I think it's fine, then we don't need to care about what the default value is. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161651566 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.File + +import scala.util.{Random, Try} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.functions.monotonically_increasing_id +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.{Benchmark, Utils} + + +/** + * Benchmark to measure read performance with Filter pushdown. + */ +// scalastyle:off line.size.limit +object FilterPushdownBenchmark { + val conf = new SparkConf() + conf.set("orc.compression", "snappy") + conf.set("spark.sql.parquet.compression.codec", "snappy") + + private val spark = SparkSession.builder() +.master("local[1]") +.appName("FilterPushdownBenchmark") +.config(conf) +.getOrCreate() + + // Set default configs. Individual cases will change them if necessary. + spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true") + spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true") + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def withTempTable(tableNames: String*)(f: => Unit): Unit = { +try f finally tableNames.foreach(spark.catalog.dropTempView) + } + + def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { +val (keys, values) = pairs.unzip +val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption) +(keys, values).zipped.foreach(spark.conf.set) +try f finally { + keys.zip(currentValues).foreach { +case (key, Some(value)) => spark.conf.set(key, value) +case (key, None) => spark.conf.unset(key) + } +} + } + + private def prepareTable(dir: File, df: DataFrame): Unit = { +val dirORC = dir.getCanonicalPath + "/orc" +val dirParquet = dir.getCanonicalPath + "/parquet" + +df.write.mode("overwrite").orc(dirORC) +df.write.mode("overwrite").parquet(dirParquet) + +spark.read.orc(dirORC).createOrReplaceTempView("orcTable") +spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable") + } + + def filterPushDownBenchmark(values: Int, width: Int, expr: String): Unit = { +val benchmark = new Benchmark(s"Filter Pushdown ($expr)", values) + +withTempPath { dir => + withTempTable("t1", "orcTable", "patquetTable") { +import spark.implicits._ +val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i") +val df = spark.range(values).map(_ => Random.nextLong).selectExpr(selectExpr: _*) + .withColumn("id", monotonically_increasing_id()) + +df.createOrReplaceTempView("t1") +prepareTable(dir, spark.sql("SELECT * FROM t1")) + +Seq(false, true).foreach { value => + benchmark.addCase(s"Parquet Vectorized ${if (value) s"(Pushdown)" else ""}") { _ => +withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> s"$value") { + spark.sql(s"SELECT * FROM parquetTable WHERE $expr").collect() +} + } +} + +Seq(false, true).foreach { value => + benchmark.addCase(s"Native ORC Vectorized ${if (value) s"(Pushdown)" else ""}") { _ => +withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> s"$value") { + spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect() +} + } +} + +// Positive cases: Select one or no rows +/* +Java HotSpot(TM) 64-Bit
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161651223 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.File + +import scala.util.{Random, Try} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.functions.monotonically_increasing_id +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.{Benchmark, Utils} + + +/** + * Benchmark to measure read performance with Filter pushdown. + */ +// scalastyle:off line.size.limit +object FilterPushdownBenchmark { + val conf = new SparkConf() + conf.set("orc.compression", "snappy") + conf.set("spark.sql.parquet.compression.codec", "snappy") + + private val spark = SparkSession.builder() +.master("local[1]") +.appName("FilterPushdownBenchmark") +.config(conf) +.getOrCreate() + + // Set default configs. Individual cases will change them if necessary. + spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true") + spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true") + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def withTempTable(tableNames: String*)(f: => Unit): Unit = { +try f finally tableNames.foreach(spark.catalog.dropTempView) + } + + def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { +val (keys, values) = pairs.unzip +val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption) +(keys, values).zipped.foreach(spark.conf.set) +try f finally { + keys.zip(currentValues).foreach { +case (key, Some(value)) => spark.conf.set(key, value) +case (key, None) => spark.conf.unset(key) + } +} + } + + private def prepareTable(dir: File, df: DataFrame): Unit = { +val dirORC = dir.getCanonicalPath + "/orc" +val dirParquet = dir.getCanonicalPath + "/parquet" + +df.write.mode("overwrite").orc(dirORC) +df.write.mode("overwrite").parquet(dirParquet) + +spark.read.orc(dirORC).createOrReplaceTempView("orcTable") +spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable") + } + + def filterPushDownBenchmark(values: Int, width: Int, expr: String): Unit = { +val benchmark = new Benchmark(s"Filter Pushdown ($expr)", values) + +withTempPath { dir => + withTempTable("t1", "orcTable", "patquetTable") { +import spark.implicits._ +val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i") +val df = spark.range(values).map(_ => Random.nextLong).selectExpr(selectExpr: _*) + .withColumn("id", monotonically_increasing_id()) + +df.createOrReplaceTempView("t1") +prepareTable(dir, spark.sql("SELECT * FROM t1")) + +Seq(false, true).foreach { value => + benchmark.addCase(s"Parquet Vectorized ${if (value) s"(Pushdown)" else ""}") { _ => +withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> s"$value") { + spark.sql(s"SELECT * FROM parquetTable WHERE $expr").collect() +} + } +} + +Seq(false, true).foreach { value => + benchmark.addCase(s"Native ORC Vectorized ${if (value) s"(Pushdown)" else ""}") { _ => +withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> s"$value") { + spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect() +} + } +} + +// Positive cases: Select one or no rows +/* +Java HotSpot(TM) 64-Bit
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161650985 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.File + +import scala.util.{Random, Try} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.functions.monotonically_increasing_id +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.{Benchmark, Utils} + + +/** + * Benchmark to measure read performance with Filter pushdown. + */ +// scalastyle:off line.size.limit +object FilterPushdownBenchmark { + val conf = new SparkConf() + conf.set("orc.compression", "snappy") + conf.set("spark.sql.parquet.compression.codec", "snappy") + + private val spark = SparkSession.builder() +.master("local[1]") +.appName("FilterPushdownBenchmark") +.config(conf) +.getOrCreate() + + // Set default configs. Individual cases will change them if necessary. + spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true") + spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true") --- End diff -- Do we need it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161650904 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.File + +import scala.util.{Random, Try} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.functions.monotonically_increasing_id +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.{Benchmark, Utils} + + +/** + * Benchmark to measure read performance with Filter pushdown. + */ +// scalastyle:off line.size.limit +object FilterPushdownBenchmark { + val conf = new SparkConf() + conf.set("orc.compression", "snappy") + conf.set("spark.sql.parquet.compression.codec", "snappy") + + private val spark = SparkSession.builder() +.master("local[1]") +.appName("FilterPushdownBenchmark") +.config(conf) +.getOrCreate() + + // Set default configs. Individual cases will change them if necessary. + spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true") + spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true") + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def withTempTable(tableNames: String*)(f: => Unit): Unit = { +try f finally tableNames.foreach(spark.catalog.dropTempView) + } + + def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { +val (keys, values) = pairs.unzip +val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption) +(keys, values).zipped.foreach(spark.conf.set) +try f finally { + keys.zip(currentValues).foreach { +case (key, Some(value)) => spark.conf.set(key, value) +case (key, None) => spark.conf.unset(key) + } +} + } + + private def prepareTable(dir: File, df: DataFrame): Unit = { +val dirORC = dir.getCanonicalPath + "/orc" +val dirParquet = dir.getCanonicalPath + "/parquet" + +df.write.mode("overwrite").orc(dirORC) +df.write.mode("overwrite").parquet(dirParquet) + +spark.read.orc(dirORC).createOrReplaceTempView("orcTable") +spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable") + } + + def filterPushDownBenchmark(values: Int, width: Int, expr: String): Unit = { +val benchmark = new Benchmark(s"Filter Pushdown ($expr)", values) + +withTempPath { dir => + withTempTable("t1", "orcTable", "patquetTable") { +import spark.implicits._ +val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i") +val df = spark.range(values).map(_ => Random.nextLong).selectExpr(selectExpr: _*) + .withColumn("id", monotonically_increasing_id()) + +df.createOrReplaceTempView("t1") +prepareTable(dir, spark.sql("SELECT * FROM t1")) + +Seq(false, true).foreach { value => --- End diff -- `value` -> `pushDownEnabled` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161650716 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.File + +import scala.util.{Random, Try} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.functions.monotonically_increasing_id +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.{Benchmark, Utils} + + +/** + * Benchmark to measure read performance with Filter pushdown. + */ +// scalastyle:off line.size.limit +object FilterPushdownBenchmark { + val conf = new SparkConf() + conf.set("orc.compression", "snappy") + conf.set("spark.sql.parquet.compression.codec", "snappy") + + private val spark = SparkSession.builder() +.master("local[1]") +.appName("FilterPushdownBenchmark") +.config(conf) +.getOrCreate() + + // Set default configs. Individual cases will change them if necessary. + spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true") + spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true") + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def withTempTable(tableNames: String*)(f: => Unit): Unit = { +try f finally tableNames.foreach(spark.catalog.dropTempView) + } + + def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { +val (keys, values) = pairs.unzip +val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption) +(keys, values).zipped.foreach(spark.conf.set) +try f finally { + keys.zip(currentValues).foreach { +case (key, Some(value)) => spark.conf.set(key, value) +case (key, None) => spark.conf.unset(key) + } +} + } + + private def prepareTable(dir: File, df: DataFrame): Unit = { +val dirORC = dir.getCanonicalPath + "/orc" +val dirParquet = dir.getCanonicalPath + "/parquet" + +df.write.mode("overwrite").orc(dirORC) +df.write.mode("overwrite").parquet(dirParquet) + +spark.read.orc(dirORC).createOrReplaceTempView("orcTable") +spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable") + } + + def filterPushDownBenchmark(values: Int, width: Int, expr: String): Unit = { +val benchmark = new Benchmark(s"Filter Pushdown ($expr)", values) + +withTempPath { dir => + withTempTable("t1", "orcTable", "patquetTable") { +import spark.implicits._ +val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i") +val df = spark.range(values).map(_ => Random.nextLong).selectExpr(selectExpr: _*) + .withColumn("id", monotonically_increasing_id()) + +df.createOrReplaceTempView("t1") +prepareTable(dir, spark.sql("SELECT * FROM t1")) + +Seq(false, true).foreach { value => + benchmark.addCase(s"Parquet Vectorized ${if (value) s"(Pushdown)" else ""}") { _ => +withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> s"$value") { + spark.sql(s"SELECT * FROM parquetTable WHERE $expr").collect() +} + } +} + +Seq(false, true).foreach { value => + benchmark.addCase(s"Native ORC Vectorized ${if (value) s"(Pushdown)" else ""}") { _ => +withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> s"$value") { + spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect() +} + } +} + +// Positive cases: Select one or no rows +/* +Java HotSpot(TM) 64-Bit
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161650635 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.File + +import scala.util.{Random, Try} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.functions.monotonically_increasing_id +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.{Benchmark, Utils} + + +/** + * Benchmark to measure read performance with Filter pushdown. + */ +// scalastyle:off line.size.limit +object FilterPushdownBenchmark { + val conf = new SparkConf() + conf.set("orc.compression", "snappy") + conf.set("spark.sql.parquet.compression.codec", "snappy") + + private val spark = SparkSession.builder() +.master("local[1]") +.appName("FilterPushdownBenchmark") +.config(conf) +.getOrCreate() + + // Set default configs. Individual cases will change them if necessary. + spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true") + spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true") + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def withTempTable(tableNames: String*)(f: => Unit): Unit = { +try f finally tableNames.foreach(spark.catalog.dropTempView) + } + + def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { +val (keys, values) = pairs.unzip +val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption) +(keys, values).zipped.foreach(spark.conf.set) +try f finally { + keys.zip(currentValues).foreach { +case (key, Some(value)) => spark.conf.set(key, value) +case (key, None) => spark.conf.unset(key) + } +} + } + + private def prepareTable(dir: File, df: DataFrame): Unit = { +val dirORC = dir.getCanonicalPath + "/orc" +val dirParquet = dir.getCanonicalPath + "/parquet" + +df.write.mode("overwrite").orc(dirORC) +df.write.mode("overwrite").parquet(dirParquet) + +spark.read.orc(dirORC).createOrReplaceTempView("orcTable") +spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable") + } + + def filterPushDownBenchmark(values: Int, width: Int, expr: String): Unit = { +val benchmark = new Benchmark(s"Filter Pushdown ($expr)", values) + +withTempPath { dir => + withTempTable("t1", "orcTable", "patquetTable") { +import spark.implicits._ +val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i") +val df = spark.range(values).map(_ => Random.nextLong).selectExpr(selectExpr: _*) + .withColumn("id", monotonically_increasing_id()) + +df.createOrReplaceTempView("t1") +prepareTable(dir, spark.sql("SELECT * FROM t1")) + +Seq(false, true).foreach { value => + benchmark.addCase(s"Parquet Vectorized ${if (value) s"(Pushdown)" else ""}") { _ => +withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> s"$value") { + spark.sql(s"SELECT * FROM parquetTable WHERE $expr").collect() +} + } +} + +Seq(false, true).foreach { value => + benchmark.addCase(s"Native ORC Vectorized ${if (value) s"(Pushdown)" else ""}") { _ => +withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> s"$value") { + spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect() +} + } +} + +// Positive cases: Select one or no rows +/* +Java HotSpot(TM) 64-Bit
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161650471 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.File + +import scala.util.{Random, Try} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.functions.monotonically_increasing_id +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.{Benchmark, Utils} + + +/** + * Benchmark to measure read performance with Filter pushdown. + */ +// scalastyle:off line.size.limit +object FilterPushdownBenchmark { + val conf = new SparkConf() + conf.set("orc.compression", "snappy") + conf.set("spark.sql.parquet.compression.codec", "snappy") + + private val spark = SparkSession.builder() +.master("local[1]") +.appName("FilterPushdownBenchmark") +.config(conf) +.getOrCreate() + + // Set default configs. Individual cases will change them if necessary. + spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true") + spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true") + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def withTempTable(tableNames: String*)(f: => Unit): Unit = { +try f finally tableNames.foreach(spark.catalog.dropTempView) + } + + def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { +val (keys, values) = pairs.unzip +val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption) +(keys, values).zipped.foreach(spark.conf.set) +try f finally { + keys.zip(currentValues).foreach { +case (key, Some(value)) => spark.conf.set(key, value) +case (key, None) => spark.conf.unset(key) + } +} + } + + private def prepareTable(dir: File, df: DataFrame): Unit = { +val dirORC = dir.getCanonicalPath + "/orc" +val dirParquet = dir.getCanonicalPath + "/parquet" + +df.write.mode("overwrite").orc(dirORC) +df.write.mode("overwrite").parquet(dirParquet) + +spark.read.orc(dirORC).createOrReplaceTempView("orcTable") +spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable") + } + + def filterPushDownBenchmark(values: Int, width: Int, expr: String): Unit = { +val benchmark = new Benchmark(s"Filter Pushdown ($expr)", values) + +withTempPath { dir => + withTempTable("t1", "orcTable", "patquetTable") { +import spark.implicits._ +val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i") +val df = spark.range(values).map(_ => Random.nextLong).selectExpr(selectExpr: _*) + .withColumn("id", monotonically_increasing_id()) + +df.createOrReplaceTempView("t1") +prepareTable(dir, spark.sql("SELECT * FROM t1")) + +Seq(false, true).foreach { value => + benchmark.addCase(s"Parquet Vectorized ${if (value) s"(Pushdown)" else ""}") { _ => +withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> s"$value") { + spark.sql(s"SELECT * FROM parquetTable WHERE $expr").collect() +} + } +} + +Seq(false, true).foreach { value => + benchmark.addCase(s"Native ORC Vectorized ${if (value) s"(Pushdown)" else ""}") { _ => +withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> s"$value") { + spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect() +} + } +} + +// Positive cases: Select one or no rows +/* +Java HotSpot(TM) 64-Bit
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161650341 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.File + +import scala.util.{Random, Try} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.functions.monotonically_increasing_id +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.{Benchmark, Utils} + + +/** + * Benchmark to measure read performance with Filter pushdown. + */ +// scalastyle:off line.size.limit +object FilterPushdownBenchmark { + val conf = new SparkConf() + conf.set("orc.compression", "snappy") + conf.set("spark.sql.parquet.compression.codec", "snappy") + + private val spark = SparkSession.builder() +.master("local[1]") +.appName("FilterPushdownBenchmark") +.config(conf) +.getOrCreate() + + // Set default configs. Individual cases will change them if necessary. + spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true") + spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true") + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def withTempTable(tableNames: String*)(f: => Unit): Unit = { +try f finally tableNames.foreach(spark.catalog.dropTempView) + } + + def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { +val (keys, values) = pairs.unzip +val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption) +(keys, values).zipped.foreach(spark.conf.set) +try f finally { + keys.zip(currentValues).foreach { +case (key, Some(value)) => spark.conf.set(key, value) +case (key, None) => spark.conf.unset(key) + } +} + } + + private def prepareTable(dir: File, df: DataFrame): Unit = { +val dirORC = dir.getCanonicalPath + "/orc" +val dirParquet = dir.getCanonicalPath + "/parquet" + +df.write.mode("overwrite").orc(dirORC) +df.write.mode("overwrite").parquet(dirParquet) + +spark.read.orc(dirORC).createOrReplaceTempView("orcTable") +spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable") + } + + def filterPushDownBenchmark(values: Int, width: Int, expr: String): Unit = { +val benchmark = new Benchmark(s"Filter Pushdown ($expr)", values) + +withTempPath { dir => + withTempTable("t1", "orcTable", "patquetTable") { +import spark.implicits._ +val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i") +val df = spark.range(values).map(_ => Random.nextLong).selectExpr(selectExpr: _*) + .withColumn("id", monotonically_increasing_id()) + +df.createOrReplaceTempView("t1") +prepareTable(dir, spark.sql("SELECT * FROM t1")) + +Seq(false, true).foreach { value => + benchmark.addCase(s"Parquet Vectorized ${if (value) s"(Pushdown)" else ""}") { _ => +withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> s"$value") { + spark.sql(s"SELECT * FROM parquetTable WHERE $expr").collect() +} + } +} + +Seq(false, true).foreach { value => + benchmark.addCase(s"Native ORC Vectorized ${if (value) s"(Pushdown)" else ""}") { _ => +withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> s"$value") { + spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect() +} + } +} + +// Positive cases: Select one or no rows +/* +Java HotSpot(TM) 64-Bit
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161637473 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.File + +import scala.util.{Random, Try} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.functions.monotonically_increasing_id +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.{Benchmark, Utils} + + +/** + * Benchmark to measure read performance with Filter pushdown. + */ +// scalastyle:off line.size.limit +object FilterPushdownBenchmark { + val conf = new SparkConf() + conf.set("orc.compression", "snappy") + conf.set("spark.sql.parquet.compression.codec", "snappy") + + private val spark = SparkSession.builder() +.master("local[1]") +.appName("FilterPushdownBenchmark") +.config(conf) +.getOrCreate() + + // Set default configs. Individual cases will change them if necessary. + spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true") + spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true") + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def withTempTable(tableNames: String*)(f: => Unit): Unit = { +try f finally tableNames.foreach(spark.catalog.dropTempView) + } + + def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { +val (keys, values) = pairs.unzip +val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption) +(keys, values).zipped.foreach(spark.conf.set) +try f finally { + keys.zip(currentValues).foreach { +case (key, Some(value)) => spark.conf.set(key, value) +case (key, None) => spark.conf.unset(key) + } +} + } + + private def prepareTable(dir: File, df: DataFrame): Unit = { +val dirORC = dir.getCanonicalPath + "/orc" +val dirParquet = dir.getCanonicalPath + "/parquet" + +df.write.mode("overwrite").orc(dirORC) +df.write.mode("overwrite").parquet(dirParquet) + +spark.read.orc(dirORC).createOrReplaceTempView("orcTable") +spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable") + } + + def filterPushDownBenchmark(values: Int, width: Int, expr: String): Unit = { +val benchmark = new Benchmark(s"Filter Pushdown ($expr)", values) + +withTempPath { dir => + withTempTable("t1", "orcTable", "patquetTable") { +import spark.implicits._ +val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i") +val df = spark.range(values).map(_ => Random.nextLong).selectExpr(selectExpr: _*) + .withColumn("id", monotonically_increasing_id()) + +df.createOrReplaceTempView("t1") +prepareTable(dir, spark.sql("SELECT * FROM t1")) + +Seq(false, true).foreach { value => + benchmark.addCase(s"Parquet Vectorized ${if (value) s"(Pushdown)" else ""}") { _ => +withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> s"$value") { + spark.sql(s"SELECT * FROM parquetTable WHERE $expr").collect() +} + } +} + +Seq(false, true).foreach { value => + benchmark.addCase(s"Native ORC Vectorized ${if (value) s"(Pushdown)" else ""}") { _ => +withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> s"$value") { + spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect() +} + } +} + +// Positive cases: Select one or no rows +/* +Java HotSpot(TM) 64-B
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161620557 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala --- @@ -483,6 +484,64 @@ object OrcReadBenchmark { } } + def filterPushDownBenchmark(values: Int, width: Int): Unit = { +val benchmark = new Benchmark(s"Filter Pushdown", values) + +withTempPath { dir => + withTempTable("t1", "nativeOrcTable", "hiveOrcTable") { +import spark.implicits._ +val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i") +val whereExpr = (1 to width).map(i => s"NOT c$i LIKE '%not%exist%'").mkString(" AND ") +val df = spark.range(values).map(_ => Random.nextLong).selectExpr(selectExpr: _*) + .withColumn("uniqueID", monotonically_increasing_id()) + +df.createOrReplaceTempView("t1") +prepareTable(dir, spark.sql("SELECT * FROM t1")) + +Seq(false, true).foreach { value => + benchmark.addCase(s"Native ORC MR (Pushdown=$value)") { _ => +withSQLConf( + SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false", + SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> s"$value") { + spark.sql( +s""" + |SELECT c1 + |FROM nativeOrcTable + |WHERE uniqueID = 0 AND $whereExpr + """.stripMargin).collect() +} + } +} + +Seq(false, true).foreach { value => + benchmark.addCase(s"Native ORC Vectorized (Pushdown=$value)") { _ => +withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> s"$value") { + spark.sql( +s""" + |SELECT c1 + |FROM nativeOrcTable + |WHERE uniqueID = 0 AND $whereExpr + """.stripMargin).collect() +} + } +} + +/* +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2 +Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz + +Filter Pushdown: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + +Native ORC MR (Pushdown=false) 16169 / 16193 0.33084.0 1.0X --- End diff -- Yep. I see. Focusing on PPD on the best reader. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161619856 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala --- @@ -483,6 +484,64 @@ object OrcReadBenchmark { } } + def filterPushDownBenchmark(values: Int, width: Int): Unit = { --- End diff -- Ur, do you expect there will be much difference in some cases? In general `most common cases`, it will be slightly slower as we can expect easily. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161619207 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala --- @@ -483,6 +484,64 @@ object OrcReadBenchmark { } } + def filterPushDownBenchmark(values: Int, width: Int): Unit = { +val benchmark = new Benchmark(s"Filter Pushdown", values) + +withTempPath { dir => + withTempTable("t1", "nativeOrcTable", "hiveOrcTable") { +import spark.implicits._ +val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i") +val whereExpr = (1 to width).map(i => s"NOT c$i LIKE '%not%exist%'").mkString(" AND ") --- End diff -- I see. @cloud-fan and @gatorsmile . For the most common cases, I also wondered that for Parquet, too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161546141 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala --- @@ -483,6 +484,64 @@ object OrcReadBenchmark { } } + def filterPushDownBenchmark(values: Int, width: Int): Unit = { --- End diff -- Could you add a test case for useless predicates too? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161545909 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala --- @@ -483,6 +484,64 @@ object OrcReadBenchmark { } } + def filterPushDownBenchmark(values: Int, width: Int): Unit = { +val benchmark = new Benchmark(s"Filter Pushdown", values) + +withTempPath { dir => + withTempTable("t1", "nativeOrcTable", "hiveOrcTable") { +import spark.implicits._ +val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i") +val whereExpr = (1 to width).map(i => s"NOT c$i LIKE '%not%exist%'").mkString(" AND ") --- End diff -- The goal of this benchmark is not to show the best case of PPD. We just want to see the perf difference of the most common cases. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161431971 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala --- @@ -483,6 +484,64 @@ object OrcReadBenchmark { } } + def filterPushDownBenchmark(values: Int, width: Int): Unit = { +val benchmark = new Benchmark(s"Filter Pushdown", values) + +withTempPath { dir => + withTempTable("t1", "nativeOrcTable", "hiveOrcTable") { +import spark.implicits._ +val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i") +val whereExpr = (1 to width).map(i => s"NOT c$i LIKE '%not%exist%'").mkString(" AND ") --- End diff -- oh sorry I missed the `uniqueID` part. So the `like` operation is just to make the difference larger? We don't need to do this, just a simple predicate like `col = 1` or `col < 1`, to show normally how much PPD improves performance. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161426568 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala --- @@ -483,6 +484,64 @@ object OrcReadBenchmark { } } + def filterPushDownBenchmark(values: Int, width: Int): Unit = { +val benchmark = new Benchmark(s"Filter Pushdown", values) + +withTempPath { dir => + withTempTable("t1", "nativeOrcTable", "hiveOrcTable") { +import spark.implicits._ +val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i") +val whereExpr = (1 to width).map(i => s"NOT c$i LIKE '%not%exist%'").mkString(" AND ") --- End diff -- I mean `LIKE '%not%exist%'` will not be optimized by `LikeSimplification`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161425712 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala --- @@ -483,6 +484,64 @@ object OrcReadBenchmark { } } + def filterPushDownBenchmark(values: Int, width: Int): Unit = { +val benchmark = new Benchmark(s"Filter Pushdown", values) + +withTempPath { dir => + withTempTable("t1", "nativeOrcTable", "hiveOrcTable") { +import spark.implicits._ +val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i") +val whereExpr = (1 to width).map(i => s"NOT c$i LIKE '%not%exist%'").mkString(" AND ") --- End diff -- Ur, @cloud-fan and @gatorsmile . The best case for PPD is **Spark needs to do lots of processing on the returned rows but ORC reader only returns one stripe with minimal CPU code**. So, I designed this benchmark in order to show the difference clearly. 1. The push-downed predicate is only `uniqueID = 0` (minimal). We can change that into `uniqueID ==` or `uniqueID >`. 2. `LIKE` predicate is chosed because it's not pushed down and makes Spark do more processing. It's just one of the example of that kind of operation. You can ignore thoses predicates. We can choose some UDFs instead. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161421828 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala --- @@ -483,6 +484,64 @@ object OrcReadBenchmark { } } + def filterPushDownBenchmark(values: Int, width: Int): Unit = { +val benchmark = new Benchmark(s"Filter Pushdown", values) + +withTempPath { dir => + withTempTable("t1", "nativeOrcTable", "hiveOrcTable") { +import spark.implicits._ +val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i") +val whereExpr = (1 to width).map(i => s"NOT c$i LIKE '%not%exist%'").mkString(" AND ") +val df = spark.range(values).map(_ => Random.nextLong).selectExpr(selectExpr: _*) + .withColumn("uniqueID", monotonically_increasing_id()) + +df.createOrReplaceTempView("t1") +prepareTable(dir, spark.sql("SELECT * FROM t1")) + +Seq(false, true).foreach { value => + benchmark.addCase(s"Native ORC MR (Pushdown=$value)") { _ => +withSQLConf( + SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false", + SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> s"$value") { + spark.sql( +s""" + |SELECT c1 + |FROM nativeOrcTable + |WHERE uniqueID = 0 AND $whereExpr + """.stripMargin).collect() +} + } +} + +Seq(false, true).foreach { value => + benchmark.addCase(s"Native ORC Vectorized (Pushdown=$value)") { _ => +withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> s"$value") { + spark.sql( +s""" + |SELECT c1 + |FROM nativeOrcTable + |WHERE uniqueID = 0 AND $whereExpr + """.stripMargin).collect() +} + } +} + +/* +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2 +Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz + +Filter Pushdown: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + +Native ORC MR (Pushdown=false) 16169 / 16193 0.33084.0 1.0X --- End diff -- let's focus on PPD for this benchmark and not disable vectorized reader. e.g. ``` col LIKE '%not%exist%' col LIKE '%not%exist%' (Pushdown) col = 3 col = 3 (Pushdown) ... ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161421703 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala --- @@ -483,6 +484,64 @@ object OrcReadBenchmark { } } + def filterPushDownBenchmark(values: Int, width: Int): Unit = { +val benchmark = new Benchmark(s"Filter Pushdown", values) + +withTempPath { dir => + withTempTable("t1", "nativeOrcTable", "hiveOrcTable") { +import spark.implicits._ +val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i") +val whereExpr = (1 to width).map(i => s"NOT c$i LIKE '%not%exist%'").mkString(" AND ") --- End diff -- This is kind of the best case for PPD, as the data is sorted. I'm fine with it, but let's add some more cases, at least `==` and `>`. We should follow other benchmarks in this file to make it completed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161406817 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala --- @@ -483,6 +484,64 @@ object OrcReadBenchmark { } } + def filterPushDownBenchmark(values: Int, width: Int): Unit = { +val benchmark = new Benchmark(s"Filter Pushdown", values) + +withTempPath { dir => + withTempTable("t1", "nativeOrcTable", "hiveOrcTable") { +import spark.implicits._ +val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i") +val whereExpr = (1 to width).map(i => s"NOT c$i LIKE '%not%exist%'").mkString(" AND ") --- End diff -- Is it important for this config PR? Letâs focus on the original purpose of this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161406728 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala --- @@ -483,6 +484,64 @@ object OrcReadBenchmark { } } + def filterPushDownBenchmark(values: Int, width: Int): Unit = { --- End diff -- Theoretically, useless predicates (selectivity 100%) only adds additional computation for both Parquet/ORC. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161405679 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala --- @@ -483,6 +484,64 @@ object OrcReadBenchmark { } } + def filterPushDownBenchmark(values: Int, width: Int): Unit = { --- End diff -- Have you seen any workload that predicate pushdown could be slower? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161405630 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala --- @@ -483,6 +484,64 @@ object OrcReadBenchmark { } } + def filterPushDownBenchmark(values: Int, width: Int): Unit = { +val benchmark = new Benchmark(s"Filter Pushdown", values) + +withTempPath { dir => + withTempTable("t1", "nativeOrcTable", "hiveOrcTable") { +import spark.implicits._ +val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i") +val whereExpr = (1 to width).map(i => s"NOT c$i LIKE '%not%exist%'").mkString(" AND ") --- End diff -- Why not using some simple predicate? Like `col > 5`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/20265#discussion_r161403798 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala --- @@ -483,6 +484,64 @@ object OrcReadBenchmark { } } + def filterPushDownBenchmark(values: Int, width: Int): Unit = { --- End diff -- Filter push-down depends on various properties of data and predicates. This is an example of filter push down performance in order to show some benefits. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...
GitHub user dongjoon-hyun opened a pull request: https://github.com/apache/spark/pull/20265 [SPARK-21783][SQL] Turn on ORC filter push-down by default ## What changes were proposed in this pull request? ORC filter push-down is disabled by default from the beginning, SPARK-2883 Now, Apache Spark starts to depend on Apache ORC 1.4.1. For Apache Spark 2.3, this PR turns on ORC filter push-down by default like Parquet (SPARK-9207) as a part of SPARK-20901, "Feature parity for ORC with Parquet". ## How was this patch tested? Pass the existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dongjoon-hyun/spark SPARK-21783 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20265.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20265 commit dda5bdf6865018613eeb98c6acbdd39ab2459c87 Author: Dongjoon Hyun Date: 2018-01-14T08:20:11Z [SPARK-21783][SQL] Turn on ORC filter push-down by default --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org