[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20265


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-17 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161981031
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala ---
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+import scala.util.{Random, Try}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.functions.monotonically_increasing_id
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.{Benchmark, Utils}
+
+
+/**
+ * Benchmark to measure read performance with Filter pushdown.
+ */
+object FilterPushdownBenchmark {
+  val conf = new SparkConf()
+  conf.set("orc.compression", "snappy")
+  conf.set("spark.sql.parquet.compression.codec", "snappy")
+
+  private val spark = SparkSession.builder()
+.master("local[1]")
+.appName("FilterPushdownBenchmark")
+.config(conf)
+.getOrCreate()
+
+  def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+try f finally tableNames.foreach(spark.catalog.dropTempView)
+  }
+
+  def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+val (keys, values) = pairs.unzip
+val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption)
+(keys, values).zipped.foreach(spark.conf.set)
+try f finally {
+  keys.zip(currentValues).foreach {
+case (key, Some(value)) => spark.conf.set(key, value)
+case (key, None) => spark.conf.unset(key)
+  }
+}
+  }
+
+  private def prepareTable(dir: File, numRows: Int, width: Int): Unit = {
+import spark.implicits._
+val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i")
+val df = spark.range(numRows).map(_ => 
Random.nextLong).selectExpr(selectExpr: _*)
+  .withColumn("id", monotonically_increasing_id())
+
+val dirORC = dir.getCanonicalPath + "/orc"
+val dirParquet = dir.getCanonicalPath + "/parquet"
+
+df.write.mode("overwrite").orc(dirORC)
+df.write.mode("overwrite").parquet(dirParquet)
+
+spark.read.orc(dirORC).createOrReplaceTempView("orcTable")
+spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable")
+  }
+
+  def filterPushDownBenchmark(
+  values: Int,
+  title: String,
+  whereExpr: String,
+  selectExpr: String = "*"): Unit = {
+val benchmark = new Benchmark(title, values, minNumIters = 5)
+
+Seq(false, true).foreach { pushDownEnabled =>
+  val name = s"Parquet Vectorized ${if (pushDownEnabled) s"(Pushdown)" 
else ""}"
+  benchmark.addCase(name) { _ =>
+withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> 
s"$pushDownEnabled") {
+  spark.sql(s"SELECT $selectExpr FROM parquetTable WHERE 
$whereExpr").collect()
+}
+  }
+}
+
+Seq(false, true).foreach { pushDownEnabled =>
+  val name = s"Native ORC Vectorized ${if (pushDownEnabled) 
s"(Pushdown)" else ""}"
+  benchmark.addCase(name) { _ =>
+withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> 
s"$pushDownEnabled") {
+  spark.sql(s"SELECT $selectExpr FROM orcTable WHERE 
$whereExpr").collect()
+}
+  }
+}
+
+/*
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2
+Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
+
+Select 0 row (id IS NULL):  Best/Avg Time(ms)Rate(M/s) 
  Per Row(ns)   Relative
+
---
+Parquet Vectorized7906 / 79

[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161980753
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala ---
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+import scala.util.{Random, Try}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.functions.monotonically_increasing_id
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.{Benchmark, Utils}
+
+
+/**
+ * Benchmark to measure read performance with Filter pushdown.
+ */
+object FilterPushdownBenchmark {
+  val conf = new SparkConf()
+  conf.set("orc.compression", "snappy")
+  conf.set("spark.sql.parquet.compression.codec", "snappy")
+
+  private val spark = SparkSession.builder()
+.master("local[1]")
+.appName("FilterPushdownBenchmark")
+.config(conf)
+.getOrCreate()
+
+  def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+try f finally tableNames.foreach(spark.catalog.dropTempView)
+  }
+
+  def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+val (keys, values) = pairs.unzip
+val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption)
+(keys, values).zipped.foreach(spark.conf.set)
+try f finally {
+  keys.zip(currentValues).foreach {
+case (key, Some(value)) => spark.conf.set(key, value)
+case (key, None) => spark.conf.unset(key)
+  }
+}
+  }
+
+  private def prepareTable(dir: File, numRows: Int, width: Int): Unit = {
+import spark.implicits._
+val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i")
+val df = spark.range(numRows).map(_ => 
Random.nextLong).selectExpr(selectExpr: _*)
+  .withColumn("id", monotonically_increasing_id())
+
+val dirORC = dir.getCanonicalPath + "/orc"
+val dirParquet = dir.getCanonicalPath + "/parquet"
+
+df.write.mode("overwrite").orc(dirORC)
+df.write.mode("overwrite").parquet(dirParquet)
+
+spark.read.orc(dirORC).createOrReplaceTempView("orcTable")
+spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable")
+  }
+
+  def filterPushDownBenchmark(
+  values: Int,
+  title: String,
+  whereExpr: String,
+  selectExpr: String = "*"): Unit = {
+val benchmark = new Benchmark(title, values, minNumIters = 5)
+
+Seq(false, true).foreach { pushDownEnabled =>
+  val name = s"Parquet Vectorized ${if (pushDownEnabled) s"(Pushdown)" 
else ""}"
+  benchmark.addCase(name) { _ =>
+withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> 
s"$pushDownEnabled") {
+  spark.sql(s"SELECT $selectExpr FROM parquetTable WHERE 
$whereExpr").collect()
+}
+  }
+}
+
+Seq(false, true).foreach { pushDownEnabled =>
+  val name = s"Native ORC Vectorized ${if (pushDownEnabled) 
s"(Pushdown)" else ""}"
+  benchmark.addCase(name) { _ =>
+withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> 
s"$pushDownEnabled") {
+  spark.sql(s"SELECT $selectExpr FROM orcTable WHERE 
$whereExpr").collect()
+}
+  }
+}
+
+/*
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2
+Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
+
+Select 0 row (id IS NULL):  Best/Avg Time(ms)Rate(M/s) 
  Per Row(ns)   Relative
+
---
+Parquet Vectorized7906 / 7955  

[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-17 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161979518
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala ---
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+import scala.util.{Random, Try}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.functions.monotonically_increasing_id
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.{Benchmark, Utils}
+
+
+/**
+ * Benchmark to measure read performance with Filter pushdown.
+ */
+object FilterPushdownBenchmark {
+  val conf = new SparkConf()
+  conf.set("orc.compression", "snappy")
+  conf.set("spark.sql.parquet.compression.codec", "snappy")
+
+  private val spark = SparkSession.builder()
+.master("local[1]")
+.appName("FilterPushdownBenchmark")
+.config(conf)
+.getOrCreate()
+
+  def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+try f finally tableNames.foreach(spark.catalog.dropTempView)
+  }
+
+  def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+val (keys, values) = pairs.unzip
+val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption)
+(keys, values).zipped.foreach(spark.conf.set)
+try f finally {
+  keys.zip(currentValues).foreach {
+case (key, Some(value)) => spark.conf.set(key, value)
+case (key, None) => spark.conf.unset(key)
+  }
+}
+  }
+
+  private def prepareTable(dir: File, numRows: Int, width: Int): Unit = {
+import spark.implicits._
+val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i")
+val df = spark.range(numRows).map(_ => 
Random.nextLong).selectExpr(selectExpr: _*)
+  .withColumn("id", monotonically_increasing_id())
+
+val dirORC = dir.getCanonicalPath + "/orc"
+val dirParquet = dir.getCanonicalPath + "/parquet"
+
+df.write.mode("overwrite").orc(dirORC)
+df.write.mode("overwrite").parquet(dirParquet)
+
+spark.read.orc(dirORC).createOrReplaceTempView("orcTable")
+spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable")
+  }
+
+  def filterPushDownBenchmark(
+  values: Int,
+  title: String,
+  whereExpr: String,
+  selectExpr: String = "*"): Unit = {
+val benchmark = new Benchmark(title, values, minNumIters = 5)
+
+Seq(false, true).foreach { pushDownEnabled =>
+  val name = s"Parquet Vectorized ${if (pushDownEnabled) s"(Pushdown)" 
else ""}"
+  benchmark.addCase(name) { _ =>
+withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> 
s"$pushDownEnabled") {
+  spark.sql(s"SELECT $selectExpr FROM parquetTable WHERE 
$whereExpr").collect()
+}
+  }
+}
+
+Seq(false, true).foreach { pushDownEnabled =>
+  val name = s"Native ORC Vectorized ${if (pushDownEnabled) 
s"(Pushdown)" else ""}"
+  benchmark.addCase(name) { _ =>
+withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> 
s"$pushDownEnabled") {
+  spark.sql(s"SELECT $selectExpr FROM orcTable WHERE 
$whereExpr").collect()
+}
+  }
+}
+
+/*
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2
+Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
+
+Select 0 row (id IS NULL):  Best/Avg Time(ms)Rate(M/s) 
  Per Row(ns)   Relative
+
---
+Parquet Vectorized7906 / 79

[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-16 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161973792
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala ---
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+import scala.util.{Random, Try}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.functions.monotonically_increasing_id
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.{Benchmark, Utils}
+
+
+/**
+ * Benchmark to measure read performance with Filter pushdown.
+ */
+object FilterPushdownBenchmark {
+  val conf = new SparkConf()
+  conf.set("orc.compression", "snappy")
+  conf.set("spark.sql.parquet.compression.codec", "snappy")
+
+  private val spark = SparkSession.builder()
+.master("local[1]")
+.appName("FilterPushdownBenchmark")
+.config(conf)
+.getOrCreate()
+
+  def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+try f finally tableNames.foreach(spark.catalog.dropTempView)
+  }
+
+  def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+val (keys, values) = pairs.unzip
+val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption)
+(keys, values).zipped.foreach(spark.conf.set)
+try f finally {
+  keys.zip(currentValues).foreach {
+case (key, Some(value)) => spark.conf.set(key, value)
+case (key, None) => spark.conf.unset(key)
+  }
+}
+  }
+
+  private def prepareTable(dir: File, numRows: Int, width: Int): Unit = {
+import spark.implicits._
+val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i")
+val df = spark.range(numRows).map(_ => 
Random.nextLong).selectExpr(selectExpr: _*)
+  .withColumn("id", monotonically_increasing_id())
+
+val dirORC = dir.getCanonicalPath + "/orc"
+val dirParquet = dir.getCanonicalPath + "/parquet"
+
+df.write.mode("overwrite").orc(dirORC)
+df.write.mode("overwrite").parquet(dirParquet)
+
+spark.read.orc(dirORC).createOrReplaceTempView("orcTable")
+spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable")
+  }
+
+  def filterPushDownBenchmark(values: Int, title: String, expr: String): 
Unit = {
+val benchmark = new Benchmark(title, values, minNumIters = 5)
+
+Seq(false, true).foreach { pushDownEnabled =>
+  val name = s"Parquet Vectorized ${if (pushDownEnabled) s"(Pushdown)" 
else ""}"
+  benchmark.addCase(name) { _ =>
+withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> 
s"$pushDownEnabled") {
+  spark.sql(s"SELECT * FROM parquetTable WHERE $expr").collect()
+}
+  }
+}
+
+Seq(false, true).foreach { pushDownEnabled =>
+  val name = s"Native ORC Vectorized ${if (pushDownEnabled) 
s"(Pushdown)" else ""}"
+  benchmark.addCase(name) { _ =>
+withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> 
s"$pushDownEnabled") {
+  spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect()
+}
+  }
+}
+
+/*
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2
+Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
+
+Select 0 row (id IS NULL):  Best/Avg Time(ms)Rate(M/s) 
  Per Row(ns)   Relative
+
---
+Parquet Vectorized2091 / 2258  0.5 
   1993.9   1.0X
+Parquet Vectorized (Pushdown)   41 /   4

[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-16 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161973650
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala ---
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+import scala.util.{Random, Try}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.functions.monotonically_increasing_id
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.{Benchmark, Utils}
+
+
+/**
+ * Benchmark to measure read performance with Filter pushdown.
+ */
+object FilterPushdownBenchmark {
+  val conf = new SparkConf()
+  conf.set("orc.compression", "snappy")
+  conf.set("spark.sql.parquet.compression.codec", "snappy")
+
+  private val spark = SparkSession.builder()
+.master("local[1]")
+.appName("FilterPushdownBenchmark")
+.config(conf)
+.getOrCreate()
+
+  def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+try f finally tableNames.foreach(spark.catalog.dropTempView)
+  }
+
+  def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+val (keys, values) = pairs.unzip
+val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption)
+(keys, values).zipped.foreach(spark.conf.set)
+try f finally {
+  keys.zip(currentValues).foreach {
+case (key, Some(value)) => spark.conf.set(key, value)
+case (key, None) => spark.conf.unset(key)
+  }
+}
+  }
+
+  private def prepareTable(dir: File, numRows: Int, width: Int): Unit = {
+import spark.implicits._
+val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i")
+val df = spark.range(numRows).map(_ => 
Random.nextLong).selectExpr(selectExpr: _*)
+  .withColumn("id", monotonically_increasing_id())
+
+val dirORC = dir.getCanonicalPath + "/orc"
+val dirParquet = dir.getCanonicalPath + "/parquet"
+
+df.write.mode("overwrite").orc(dirORC)
+df.write.mode("overwrite").parquet(dirParquet)
+
+spark.read.orc(dirORC).createOrReplaceTempView("orcTable")
+spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable")
+  }
+
+  def filterPushDownBenchmark(values: Int, title: String, expr: String): 
Unit = {
+val benchmark = new Benchmark(title, values, minNumIters = 5)
+
+Seq(false, true).foreach { pushDownEnabled =>
+  val name = s"Parquet Vectorized ${if (pushDownEnabled) s"(Pushdown)" 
else ""}"
+  benchmark.addCase(name) { _ =>
+withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> 
s"$pushDownEnabled") {
+  spark.sql(s"SELECT * FROM parquetTable WHERE $expr").collect()
+}
+  }
+}
+
+Seq(false, true).foreach { pushDownEnabled =>
+  val name = s"Native ORC Vectorized ${if (pushDownEnabled) 
s"(Pushdown)" else ""}"
+  benchmark.addCase(name) { _ =>
+withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> 
s"$pushDownEnabled") {
+  spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect()
+}
+  }
+}
+
+/*
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2
+Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
+
+Select 0 row (id IS NULL):  Best/Avg Time(ms)Rate(M/s) 
  Per Row(ns)   Relative
+
---
+Parquet Vectorized2091 / 2258  0.5 
   1993.9   1.0X
+Parquet Vectorized (Pushdown)   41 /   4

[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161964178
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala ---
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+import scala.util.{Random, Try}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.functions.monotonically_increasing_id
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.{Benchmark, Utils}
+
+
+/**
+ * Benchmark to measure read performance with Filter pushdown.
+ */
+object FilterPushdownBenchmark {
+  val conf = new SparkConf()
+  conf.set("orc.compression", "snappy")
+  conf.set("spark.sql.parquet.compression.codec", "snappy")
+
+  private val spark = SparkSession.builder()
+.master("local[1]")
+.appName("FilterPushdownBenchmark")
+.config(conf)
+.getOrCreate()
+
+  def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+try f finally tableNames.foreach(spark.catalog.dropTempView)
+  }
+
+  def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+val (keys, values) = pairs.unzip
+val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption)
+(keys, values).zipped.foreach(spark.conf.set)
+try f finally {
+  keys.zip(currentValues).foreach {
+case (key, Some(value)) => spark.conf.set(key, value)
+case (key, None) => spark.conf.unset(key)
+  }
+}
+  }
+
+  private def prepareTable(dir: File, numRows: Int, width: Int): Unit = {
+import spark.implicits._
+val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i")
+val df = spark.range(numRows).map(_ => 
Random.nextLong).selectExpr(selectExpr: _*)
+  .withColumn("id", monotonically_increasing_id())
+
+val dirORC = dir.getCanonicalPath + "/orc"
+val dirParquet = dir.getCanonicalPath + "/parquet"
+
+df.write.mode("overwrite").orc(dirORC)
+df.write.mode("overwrite").parquet(dirParquet)
+
+spark.read.orc(dirORC).createOrReplaceTempView("orcTable")
+spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable")
+  }
+
+  def filterPushDownBenchmark(values: Int, title: String, expr: String): 
Unit = {
+val benchmark = new Benchmark(title, values, minNumIters = 5)
+
+Seq(false, true).foreach { pushDownEnabled =>
+  val name = s"Parquet Vectorized ${if (pushDownEnabled) s"(Pushdown)" 
else ""}"
+  benchmark.addCase(name) { _ =>
+withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> 
s"$pushDownEnabled") {
+  spark.sql(s"SELECT * FROM parquetTable WHERE $expr").collect()
+}
+  }
+}
+
+Seq(false, true).foreach { pushDownEnabled =>
+  val name = s"Native ORC Vectorized ${if (pushDownEnabled) 
s"(Pushdown)" else ""}"
+  benchmark.addCase(name) { _ =>
+withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> 
s"$pushDownEnabled") {
+  spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect()
+}
+  }
+}
+
+/*
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2
+Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
+
+Select 0 row (id IS NULL):  Best/Avg Time(ms)Rate(M/s) 
  Per Row(ns)   Relative
+
---
+Parquet Vectorized2091 / 2258  0.5 
   1993.9   1.0X
+Parquet Vectorized (Pushdown)   41 /   44   

[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161963925
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala ---
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+import scala.util.{Random, Try}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.functions.monotonically_increasing_id
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.{Benchmark, Utils}
+
+
+/**
+ * Benchmark to measure read performance with Filter pushdown.
+ */
+object FilterPushdownBenchmark {
+  val conf = new SparkConf()
+  conf.set("orc.compression", "snappy")
+  conf.set("spark.sql.parquet.compression.codec", "snappy")
+
+  private val spark = SparkSession.builder()
+.master("local[1]")
+.appName("FilterPushdownBenchmark")
+.config(conf)
+.getOrCreate()
+
+  def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+try f finally tableNames.foreach(spark.catalog.dropTempView)
+  }
+
+  def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+val (keys, values) = pairs.unzip
+val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption)
+(keys, values).zipped.foreach(spark.conf.set)
+try f finally {
+  keys.zip(currentValues).foreach {
+case (key, Some(value)) => spark.conf.set(key, value)
+case (key, None) => spark.conf.unset(key)
+  }
+}
+  }
+
+  private def prepareTable(dir: File, numRows: Int, width: Int): Unit = {
+import spark.implicits._
+val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i")
+val df = spark.range(numRows).map(_ => 
Random.nextLong).selectExpr(selectExpr: _*)
+  .withColumn("id", monotonically_increasing_id())
+
+val dirORC = dir.getCanonicalPath + "/orc"
+val dirParquet = dir.getCanonicalPath + "/parquet"
+
+df.write.mode("overwrite").orc(dirORC)
+df.write.mode("overwrite").parquet(dirParquet)
+
+spark.read.orc(dirORC).createOrReplaceTempView("orcTable")
+spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable")
+  }
+
+  def filterPushDownBenchmark(values: Int, title: String, expr: String): 
Unit = {
+val benchmark = new Benchmark(title, values, minNumIters = 5)
+
+Seq(false, true).foreach { pushDownEnabled =>
+  val name = s"Parquet Vectorized ${if (pushDownEnabled) s"(Pushdown)" 
else ""}"
+  benchmark.addCase(name) { _ =>
+withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> 
s"$pushDownEnabled") {
+  spark.sql(s"SELECT * FROM parquetTable WHERE $expr").collect()
+}
+  }
+}
+
+Seq(false, true).foreach { pushDownEnabled =>
+  val name = s"Native ORC Vectorized ${if (pushDownEnabled) 
s"(Pushdown)" else ""}"
+  benchmark.addCase(name) { _ =>
+withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> 
s"$pushDownEnabled") {
+  spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect()
+}
+  }
+}
+
+/*
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2
+Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
+
+Select 0 row (id IS NULL):  Best/Avg Time(ms)Rate(M/s) 
  Per Row(ns)   Relative
+
---
+Parquet Vectorized2091 / 2258  0.5 
   1993.9   1.0X
+Parquet Vectorized (Pushdown)   41 /   44   

[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161963288
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala ---
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+import scala.util.{Random, Try}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.functions.monotonically_increasing_id
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.{Benchmark, Utils}
+
+
+/**
+ * Benchmark to measure read performance with Filter pushdown.
+ */
+object FilterPushdownBenchmark {
+  val conf = new SparkConf()
+  conf.set("orc.compression", "snappy")
+  conf.set("spark.sql.parquet.compression.codec", "snappy")
+
+  private val spark = SparkSession.builder()
+.master("local[1]")
+.appName("FilterPushdownBenchmark")
+.config(conf)
+.getOrCreate()
+
+  def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+try f finally tableNames.foreach(spark.catalog.dropTempView)
+  }
+
+  def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+val (keys, values) = pairs.unzip
+val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption)
+(keys, values).zipped.foreach(spark.conf.set)
+try f finally {
+  keys.zip(currentValues).foreach {
+case (key, Some(value)) => spark.conf.set(key, value)
+case (key, None) => spark.conf.unset(key)
+  }
+}
+  }
+
+  private def prepareTable(dir: File, numRows: Int, width: Int): Unit = {
+import spark.implicits._
+val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i")
+val df = spark.range(numRows).map(_ => 
Random.nextLong).selectExpr(selectExpr: _*)
+  .withColumn("id", monotonically_increasing_id())
+
+val dirORC = dir.getCanonicalPath + "/orc"
+val dirParquet = dir.getCanonicalPath + "/parquet"
+
+df.write.mode("overwrite").orc(dirORC)
+df.write.mode("overwrite").parquet(dirParquet)
+
+spark.read.orc(dirORC).createOrReplaceTempView("orcTable")
+spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable")
+  }
+
+  def filterPushDownBenchmark(values: Int, title: String, expr: String): 
Unit = {
+val benchmark = new Benchmark(title, values, minNumIters = 5)
+
+Seq(false, true).foreach { pushDownEnabled =>
+  val name = s"Parquet Vectorized ${if (pushDownEnabled) s"(Pushdown)" 
else ""}"
+  benchmark.addCase(name) { _ =>
+withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> 
s"$pushDownEnabled") {
+  spark.sql(s"SELECT * FROM parquetTable WHERE $expr").collect()
+}
+  }
+}
+
+Seq(false, true).foreach { pushDownEnabled =>
+  val name = s"Native ORC Vectorized ${if (pushDownEnabled) 
s"(Pushdown)" else ""}"
+  benchmark.addCase(name) { _ =>
+withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> 
s"$pushDownEnabled") {
+  spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect()
+}
+  }
+}
+
+/*
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2
+Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
+
+Select 0 row (id IS NULL):  Best/Avg Time(ms)Rate(M/s) 
  Per Row(ns)   Relative
+
---
+Parquet Vectorized2091 / 2258  0.5 
   1993.9   1.0X
+Parquet Vectorized (Pushdown)   41 /   44   

[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-15 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161671821
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+import scala.util.{Random, Try}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.functions.monotonically_increasing_id
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.{Benchmark, Utils}
+
+
+/**
+ * Benchmark to measure read performance with Filter pushdown.
+ */
+// scalastyle:off line.size.limit
+object FilterPushdownBenchmark {
+  val conf = new SparkConf()
+  conf.set("orc.compression", "snappy")
+  conf.set("spark.sql.parquet.compression.codec", "snappy")
+
+  private val spark = SparkSession.builder()
+.master("local[1]")
+.appName("FilterPushdownBenchmark")
+.config(conf)
+.getOrCreate()
+
+  // Set default configs. Individual cases will change them if necessary.
+  spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true")
+  spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true")
+
+  def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+try f finally tableNames.foreach(spark.catalog.dropTempView)
+  }
+
+  def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+val (keys, values) = pairs.unzip
+val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption)
+(keys, values).zipped.foreach(spark.conf.set)
+try f finally {
+  keys.zip(currentValues).foreach {
+case (key, Some(value)) => spark.conf.set(key, value)
+case (key, None) => spark.conf.unset(key)
+  }
+}
+  }
+
+  private def prepareTable(dir: File, df: DataFrame): Unit = {
+val dirORC = dir.getCanonicalPath + "/orc"
+val dirParquet = dir.getCanonicalPath + "/parquet"
+
+df.write.mode("overwrite").orc(dirORC)
+df.write.mode("overwrite").parquet(dirParquet)
+
+spark.read.orc(dirORC).createOrReplaceTempView("orcTable")
+spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable")
+  }
+
+  def filterPushDownBenchmark(values: Int, width: Int, expr: String): Unit 
= {
+val benchmark = new Benchmark(s"Filter Pushdown ($expr)", values)
+
+withTempPath { dir =>
+  withTempTable("t1", "orcTable", "patquetTable") {
+import spark.implicits._
+val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) 
c$i")
+val df = spark.range(values).map(_ => 
Random.nextLong).selectExpr(selectExpr: _*)
+  .withColumn("id", monotonically_increasing_id())
+
+df.createOrReplaceTempView("t1")
+prepareTable(dir, spark.sql("SELECT * FROM t1"))
+
+Seq(false, true).foreach { value =>
+  benchmark.addCase(s"Parquet Vectorized ${if (value) 
s"(Pushdown)" else ""}") { _ =>
+withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> 
s"$value") {
+  spark.sql(s"SELECT * FROM parquetTable WHERE 
$expr").collect()
+}
+  }
+}
+
+Seq(false, true).foreach { value =>
+  benchmark.addCase(s"Native ORC Vectorized ${if (value) 
s"(Pushdown)" else ""}") { _ =>
+withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> 
s"$value") {
+  spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect()
+}
+  }
+}
+
+// Positive cases: Select one or no rows
+/*
+Java HotSpot(TM) 64-B

[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-15 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161672411
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+import scala.util.{Random, Try}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.functions.monotonically_increasing_id
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.{Benchmark, Utils}
+
+
+/**
+ * Benchmark to measure read performance with Filter pushdown.
+ */
+// scalastyle:off line.size.limit
+object FilterPushdownBenchmark {
+  val conf = new SparkConf()
+  conf.set("orc.compression", "snappy")
+  conf.set("spark.sql.parquet.compression.codec", "snappy")
+
+  private val spark = SparkSession.builder()
+.master("local[1]")
+.appName("FilterPushdownBenchmark")
+.config(conf)
+.getOrCreate()
+
+  // Set default configs. Individual cases will change them if necessary.
+  spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true")
+  spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true")
+
+  def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+try f finally tableNames.foreach(spark.catalog.dropTempView)
+  }
+
+  def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+val (keys, values) = pairs.unzip
+val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption)
+(keys, values).zipped.foreach(spark.conf.set)
+try f finally {
+  keys.zip(currentValues).foreach {
+case (key, Some(value)) => spark.conf.set(key, value)
+case (key, None) => spark.conf.unset(key)
+  }
+}
+  }
+
+  private def prepareTable(dir: File, df: DataFrame): Unit = {
+val dirORC = dir.getCanonicalPath + "/orc"
+val dirParquet = dir.getCanonicalPath + "/parquet"
+
+df.write.mode("overwrite").orc(dirORC)
+df.write.mode("overwrite").parquet(dirParquet)
+
+spark.read.orc(dirORC).createOrReplaceTempView("orcTable")
+spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable")
+  }
+
+  def filterPushDownBenchmark(values: Int, width: Int, expr: String): Unit 
= {
+val benchmark = new Benchmark(s"Filter Pushdown ($expr)", values)
+
+withTempPath { dir =>
+  withTempTable("t1", "orcTable", "patquetTable") {
+import spark.implicits._
+val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) 
c$i")
+val df = spark.range(values).map(_ => 
Random.nextLong).selectExpr(selectExpr: _*)
+  .withColumn("id", monotonically_increasing_id())
+
+df.createOrReplaceTempView("t1")
+prepareTable(dir, spark.sql("SELECT * FROM t1"))
+
+Seq(false, true).foreach { value =>
+  benchmark.addCase(s"Parquet Vectorized ${if (value) 
s"(Pushdown)" else ""}") { _ =>
+withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> 
s"$value") {
+  spark.sql(s"SELECT * FROM parquetTable WHERE 
$expr").collect()
+}
+  }
+}
+
+Seq(false, true).foreach { value =>
+  benchmark.addCase(s"Native ORC Vectorized ${if (value) 
s"(Pushdown)" else ""}") { _ =>
+withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> 
s"$value") {
+  spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect()
+}
+  }
+}
+
+// Positive cases: Select one or no rows
+/*
+Java HotSpot(TM) 64-B

[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-15 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161672316
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+import scala.util.{Random, Try}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.functions.monotonically_increasing_id
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.{Benchmark, Utils}
+
+
+/**
+ * Benchmark to measure read performance with Filter pushdown.
+ */
+// scalastyle:off line.size.limit
+object FilterPushdownBenchmark {
+  val conf = new SparkConf()
+  conf.set("orc.compression", "snappy")
+  conf.set("spark.sql.parquet.compression.codec", "snappy")
+
+  private val spark = SparkSession.builder()
+.master("local[1]")
+.appName("FilterPushdownBenchmark")
+.config(conf)
+.getOrCreate()
+
+  // Set default configs. Individual cases will change them if necessary.
+  spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true")
+  spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true")
+
+  def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+try f finally tableNames.foreach(spark.catalog.dropTempView)
+  }
+
+  def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+val (keys, values) = pairs.unzip
+val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption)
+(keys, values).zipped.foreach(spark.conf.set)
+try f finally {
+  keys.zip(currentValues).foreach {
+case (key, Some(value)) => spark.conf.set(key, value)
+case (key, None) => spark.conf.unset(key)
+  }
+}
+  }
+
+  private def prepareTable(dir: File, df: DataFrame): Unit = {
+val dirORC = dir.getCanonicalPath + "/orc"
+val dirParquet = dir.getCanonicalPath + "/parquet"
+
+df.write.mode("overwrite").orc(dirORC)
+df.write.mode("overwrite").parquet(dirParquet)
+
+spark.read.orc(dirORC).createOrReplaceTempView("orcTable")
+spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable")
+  }
+
+  def filterPushDownBenchmark(values: Int, width: Int, expr: String): Unit 
= {
+val benchmark = new Benchmark(s"Filter Pushdown ($expr)", values)
+
+withTempPath { dir =>
+  withTempTable("t1", "orcTable", "patquetTable") {
+import spark.implicits._
+val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) 
c$i")
+val df = spark.range(values).map(_ => 
Random.nextLong).selectExpr(selectExpr: _*)
+  .withColumn("id", monotonically_increasing_id())
+
+df.createOrReplaceTempView("t1")
+prepareTable(dir, spark.sql("SELECT * FROM t1"))
+
+Seq(false, true).foreach { value =>
+  benchmark.addCase(s"Parquet Vectorized ${if (value) 
s"(Pushdown)" else ""}") { _ =>
+withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> 
s"$value") {
+  spark.sql(s"SELECT * FROM parquetTable WHERE 
$expr").collect()
+}
+  }
+}
+
+Seq(false, true).foreach { value =>
+  benchmark.addCase(s"Native ORC Vectorized ${if (value) 
s"(Pushdown)" else ""}") { _ =>
+withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> 
s"$value") {
+  spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect()
+}
+  }
+}
+
+// Positive cases: Select one or no rows
+/*
+Java HotSpot(TM) 64-B

[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-15 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161671868
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+import scala.util.{Random, Try}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.functions.monotonically_increasing_id
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.{Benchmark, Utils}
+
+
+/**
+ * Benchmark to measure read performance with Filter pushdown.
+ */
+// scalastyle:off line.size.limit
+object FilterPushdownBenchmark {
+  val conf = new SparkConf()
+  conf.set("orc.compression", "snappy")
+  conf.set("spark.sql.parquet.compression.codec", "snappy")
+
+  private val spark = SparkSession.builder()
+.master("local[1]")
+.appName("FilterPushdownBenchmark")
+.config(conf)
+.getOrCreate()
+
+  // Set default configs. Individual cases will change them if necessary.
+  spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true")
+  spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true")
+
+  def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+try f finally tableNames.foreach(spark.catalog.dropTempView)
+  }
+
+  def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+val (keys, values) = pairs.unzip
+val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption)
+(keys, values).zipped.foreach(spark.conf.set)
+try f finally {
+  keys.zip(currentValues).foreach {
+case (key, Some(value)) => spark.conf.set(key, value)
+case (key, None) => spark.conf.unset(key)
+  }
+}
+  }
+
+  private def prepareTable(dir: File, df: DataFrame): Unit = {
+val dirORC = dir.getCanonicalPath + "/orc"
+val dirParquet = dir.getCanonicalPath + "/parquet"
+
+df.write.mode("overwrite").orc(dirORC)
+df.write.mode("overwrite").parquet(dirParquet)
+
+spark.read.orc(dirORC).createOrReplaceTempView("orcTable")
+spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable")
+  }
+
+  def filterPushDownBenchmark(values: Int, width: Int, expr: String): Unit 
= {
+val benchmark = new Benchmark(s"Filter Pushdown ($expr)", values)
+
+withTempPath { dir =>
+  withTempTable("t1", "orcTable", "patquetTable") {
+import spark.implicits._
+val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) 
c$i")
+val df = spark.range(values).map(_ => 
Random.nextLong).selectExpr(selectExpr: _*)
+  .withColumn("id", monotonically_increasing_id())
+
+df.createOrReplaceTempView("t1")
+prepareTable(dir, spark.sql("SELECT * FROM t1"))
+
+Seq(false, true).foreach { value =>
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-15 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161671835
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+import scala.util.{Random, Try}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.functions.monotonically_increasing_id
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.{Benchmark, Utils}
+
+
+/**
+ * Benchmark to measure read performance with Filter pushdown.
+ */
+// scalastyle:off line.size.limit
+object FilterPushdownBenchmark {
+  val conf = new SparkConf()
+  conf.set("orc.compression", "snappy")
+  conf.set("spark.sql.parquet.compression.codec", "snappy")
+
+  private val spark = SparkSession.builder()
+.master("local[1]")
+.appName("FilterPushdownBenchmark")
+.config(conf)
+.getOrCreate()
+
+  // Set default configs. Individual cases will change them if necessary.
+  spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true")
+  spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true")
+
+  def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+try f finally tableNames.foreach(spark.catalog.dropTempView)
+  }
+
+  def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+val (keys, values) = pairs.unzip
+val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption)
+(keys, values).zipped.foreach(spark.conf.set)
+try f finally {
+  keys.zip(currentValues).foreach {
+case (key, Some(value)) => spark.conf.set(key, value)
+case (key, None) => spark.conf.unset(key)
+  }
+}
+  }
+
+  private def prepareTable(dir: File, df: DataFrame): Unit = {
+val dirORC = dir.getCanonicalPath + "/orc"
+val dirParquet = dir.getCanonicalPath + "/parquet"
+
+df.write.mode("overwrite").orc(dirORC)
+df.write.mode("overwrite").parquet(dirParquet)
+
+spark.read.orc(dirORC).createOrReplaceTempView("orcTable")
+spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable")
+  }
+
+  def filterPushDownBenchmark(values: Int, width: Int, expr: String): Unit 
= {
+val benchmark = new Benchmark(s"Filter Pushdown ($expr)", values)
+
+withTempPath { dir =>
+  withTempTable("t1", "orcTable", "patquetTable") {
+import spark.implicits._
+val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) 
c$i")
+val df = spark.range(values).map(_ => 
Random.nextLong).selectExpr(selectExpr: _*)
+  .withColumn("id", monotonically_increasing_id())
+
+df.createOrReplaceTempView("t1")
+prepareTable(dir, spark.sql("SELECT * FROM t1"))
+
+Seq(false, true).foreach { value =>
+  benchmark.addCase(s"Parquet Vectorized ${if (value) 
s"(Pushdown)" else ""}") { _ =>
+withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> 
s"$value") {
+  spark.sql(s"SELECT * FROM parquetTable WHERE 
$expr").collect()
+}
+  }
+}
+
+Seq(false, true).foreach { value =>
+  benchmark.addCase(s"Native ORC Vectorized ${if (value) 
s"(Pushdown)" else ""}") { _ =>
+withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> 
s"$value") {
+  spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect()
+}
+  }
+}
+
+// Positive cases: Select one or no rows
+/*
+Java HotSpot(TM) 64-B

[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161652710
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+import scala.util.{Random, Try}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.functions.monotonically_increasing_id
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.{Benchmark, Utils}
+
+
+/**
+ * Benchmark to measure read performance with Filter pushdown.
+ */
+// scalastyle:off line.size.limit
+object FilterPushdownBenchmark {
+  val conf = new SparkConf()
+  conf.set("orc.compression", "snappy")
+  conf.set("spark.sql.parquet.compression.codec", "snappy")
+
+  private val spark = SparkSession.builder()
+.master("local[1]")
+.appName("FilterPushdownBenchmark")
+.config(conf)
+.getOrCreate()
+
+  // Set default configs. Individual cases will change them if necessary.
+  spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true")
+  spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true")
+
+  def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+try f finally tableNames.foreach(spark.catalog.dropTempView)
+  }
+
+  def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+val (keys, values) = pairs.unzip
+val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption)
+(keys, values).zipped.foreach(spark.conf.set)
+try f finally {
+  keys.zip(currentValues).foreach {
+case (key, Some(value)) => spark.conf.set(key, value)
+case (key, None) => spark.conf.unset(key)
+  }
+}
+  }
+
+  private def prepareTable(dir: File, df: DataFrame): Unit = {
+val dirORC = dir.getCanonicalPath + "/orc"
+val dirParquet = dir.getCanonicalPath + "/parquet"
+
+df.write.mode("overwrite").orc(dirORC)
+df.write.mode("overwrite").parquet(dirParquet)
+
+spark.read.orc(dirORC).createOrReplaceTempView("orcTable")
+spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable")
+  }
+
+  def filterPushDownBenchmark(values: Int, width: Int, expr: String): Unit 
= {
+val benchmark = new Benchmark(s"Filter Pushdown ($expr)", values)
+
+withTempPath { dir =>
+  withTempTable("t1", "orcTable", "patquetTable") {
+import spark.implicits._
+val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) 
c$i")
+val df = spark.range(values).map(_ => 
Random.nextLong).selectExpr(selectExpr: _*)
+  .withColumn("id", monotonically_increasing_id())
+
+df.createOrReplaceTempView("t1")
+prepareTable(dir, spark.sql("SELECT * FROM t1"))
+
+Seq(false, true).foreach { value =>
+  benchmark.addCase(s"Parquet Vectorized ${if (value) 
s"(Pushdown)" else ""}") { _ =>
+withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> 
s"$value") {
+  spark.sql(s"SELECT * FROM parquetTable WHERE 
$expr").collect()
+}
+  }
+}
+
+Seq(false, true).foreach { value =>
+  benchmark.addCase(s"Native ORC Vectorized ${if (value) 
s"(Pushdown)" else ""}") { _ =>
+withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> 
s"$value") {
+  spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect()
+}
+  }
+}
+
+// Positive cases: Select one or no rows
+/*
+Java HotSpot(TM) 64-Bit S

[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161652575
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+import scala.util.{Random, Try}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.functions.monotonically_increasing_id
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.{Benchmark, Utils}
+
+
+/**
+ * Benchmark to measure read performance with Filter pushdown.
+ */
+// scalastyle:off line.size.limit
+object FilterPushdownBenchmark {
+  val conf = new SparkConf()
+  conf.set("orc.compression", "snappy")
+  conf.set("spark.sql.parquet.compression.codec", "snappy")
+
+  private val spark = SparkSession.builder()
+.master("local[1]")
+.appName("FilterPushdownBenchmark")
+.config(conf)
+.getOrCreate()
+
+  // Set default configs. Individual cases will change them if necessary.
+  spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true")
+  spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true")
+
+  def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+try f finally tableNames.foreach(spark.catalog.dropTempView)
+  }
+
+  def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+val (keys, values) = pairs.unzip
+val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption)
+(keys, values).zipped.foreach(spark.conf.set)
+try f finally {
+  keys.zip(currentValues).foreach {
+case (key, Some(value)) => spark.conf.set(key, value)
+case (key, None) => spark.conf.unset(key)
+  }
+}
+  }
+
+  private def prepareTable(dir: File, df: DataFrame): Unit = {
+val dirORC = dir.getCanonicalPath + "/orc"
+val dirParquet = dir.getCanonicalPath + "/parquet"
+
+df.write.mode("overwrite").orc(dirORC)
+df.write.mode("overwrite").parquet(dirParquet)
+
+spark.read.orc(dirORC).createOrReplaceTempView("orcTable")
+spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable")
+  }
+
+  def filterPushDownBenchmark(values: Int, width: Int, expr: String): Unit 
= {
+val benchmark = new Benchmark(s"Filter Pushdown ($expr)", values)
+
+withTempPath { dir =>
+  withTempTable("t1", "orcTable", "patquetTable") {
+import spark.implicits._
+val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) 
c$i")
+val df = spark.range(values).map(_ => 
Random.nextLong).selectExpr(selectExpr: _*)
+  .withColumn("id", monotonically_increasing_id())
+
+df.createOrReplaceTempView("t1")
+prepareTable(dir, spark.sql("SELECT * FROM t1"))
+
+Seq(false, true).foreach { value =>
+  benchmark.addCase(s"Parquet Vectorized ${if (value) 
s"(Pushdown)" else ""}") { _ =>
+withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> 
s"$value") {
+  spark.sql(s"SELECT * FROM parquetTable WHERE 
$expr").collect()
+}
+  }
+}
+
+Seq(false, true).foreach { value =>
+  benchmark.addCase(s"Native ORC Vectorized ${if (value) 
s"(Pushdown)" else ""}") { _ =>
+withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> 
s"$value") {
+  spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect()
+}
+  }
+}
+
+// Positive cases: Select one or no rows
+/*
+Java HotSpot(TM) 64-Bit S

[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161652489
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+import scala.util.{Random, Try}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.functions.monotonically_increasing_id
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.{Benchmark, Utils}
+
+
+/**
+ * Benchmark to measure read performance with Filter pushdown.
+ */
+// scalastyle:off line.size.limit
+object FilterPushdownBenchmark {
+  val conf = new SparkConf()
+  conf.set("orc.compression", "snappy")
+  conf.set("spark.sql.parquet.compression.codec", "snappy")
+
+  private val spark = SparkSession.builder()
+.master("local[1]")
+.appName("FilterPushdownBenchmark")
+.config(conf)
+.getOrCreate()
+
+  // Set default configs. Individual cases will change them if necessary.
+  spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true")
+  spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true")
+
+  def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+try f finally tableNames.foreach(spark.catalog.dropTempView)
+  }
+
+  def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+val (keys, values) = pairs.unzip
+val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption)
+(keys, values).zipped.foreach(spark.conf.set)
+try f finally {
+  keys.zip(currentValues).foreach {
+case (key, Some(value)) => spark.conf.set(key, value)
+case (key, None) => spark.conf.unset(key)
+  }
+}
+  }
+
+  private def prepareTable(dir: File, df: DataFrame): Unit = {
+val dirORC = dir.getCanonicalPath + "/orc"
+val dirParquet = dir.getCanonicalPath + "/parquet"
+
+df.write.mode("overwrite").orc(dirORC)
+df.write.mode("overwrite").parquet(dirParquet)
+
+spark.read.orc(dirORC).createOrReplaceTempView("orcTable")
+spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable")
+  }
+
+  def filterPushDownBenchmark(values: Int, width: Int, expr: String): Unit 
= {
+val benchmark = new Benchmark(s"Filter Pushdown ($expr)", values)
+
+withTempPath { dir =>
+  withTempTable("t1", "orcTable", "patquetTable") {
+import spark.implicits._
+val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) 
c$i")
+val df = spark.range(values).map(_ => 
Random.nextLong).selectExpr(selectExpr: _*)
+  .withColumn("id", monotonically_increasing_id())
+
+df.createOrReplaceTempView("t1")
+prepareTable(dir, spark.sql("SELECT * FROM t1"))
+
+Seq(false, true).foreach { value =>
+  benchmark.addCase(s"Parquet Vectorized ${if (value) 
s"(Pushdown)" else ""}") { _ =>
+withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> 
s"$value") {
+  spark.sql(s"SELECT * FROM parquetTable WHERE 
$expr").collect()
+}
+  }
+}
+
+Seq(false, true).foreach { value =>
+  benchmark.addCase(s"Native ORC Vectorized ${if (value) 
s"(Pushdown)" else ""}") { _ =>
+withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> 
s"$value") {
+  spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect()
+}
+  }
+}
+
+// Positive cases: Select one or no rows
+/*
+Java HotSpot(TM) 64-Bit S

[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161652403
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+import scala.util.{Random, Try}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.functions.monotonically_increasing_id
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.{Benchmark, Utils}
+
+
+/**
+ * Benchmark to measure read performance with Filter pushdown.
+ */
+// scalastyle:off line.size.limit
+object FilterPushdownBenchmark {
+  val conf = new SparkConf()
+  conf.set("orc.compression", "snappy")
+  conf.set("spark.sql.parquet.compression.codec", "snappy")
+
+  private val spark = SparkSession.builder()
+.master("local[1]")
+.appName("FilterPushdownBenchmark")
+.config(conf)
+.getOrCreate()
+
+  // Set default configs. Individual cases will change them if necessary.
+  spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true")
+  spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true")
+
+  def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+try f finally tableNames.foreach(spark.catalog.dropTempView)
+  }
+
+  def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+val (keys, values) = pairs.unzip
+val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption)
+(keys, values).zipped.foreach(spark.conf.set)
+try f finally {
+  keys.zip(currentValues).foreach {
+case (key, Some(value)) => spark.conf.set(key, value)
+case (key, None) => spark.conf.unset(key)
+  }
+}
+  }
+
+  private def prepareTable(dir: File, df: DataFrame): Unit = {
+val dirORC = dir.getCanonicalPath + "/orc"
+val dirParquet = dir.getCanonicalPath + "/parquet"
+
+df.write.mode("overwrite").orc(dirORC)
+df.write.mode("overwrite").parquet(dirParquet)
+
+spark.read.orc(dirORC).createOrReplaceTempView("orcTable")
+spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable")
+  }
+
+  def filterPushDownBenchmark(values: Int, width: Int, expr: String): Unit 
= {
+val benchmark = new Benchmark(s"Filter Pushdown ($expr)", values)
+
+withTempPath { dir =>
+  withTempTable("t1", "orcTable", "patquetTable") {
+import spark.implicits._
+val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) 
c$i")
+val df = spark.range(values).map(_ => 
Random.nextLong).selectExpr(selectExpr: _*)
+  .withColumn("id", monotonically_increasing_id())
+
+df.createOrReplaceTempView("t1")
+prepareTable(dir, spark.sql("SELECT * FROM t1"))
+
+Seq(false, true).foreach { value =>
+  benchmark.addCase(s"Parquet Vectorized ${if (value) 
s"(Pushdown)" else ""}") { _ =>
+withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> 
s"$value") {
+  spark.sql(s"SELECT * FROM parquetTable WHERE 
$expr").collect()
+}
+  }
+}
+
+Seq(false, true).foreach { value =>
+  benchmark.addCase(s"Native ORC Vectorized ${if (value) 
s"(Pushdown)" else ""}") { _ =>
+withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> 
s"$value") {
+  spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect()
+}
+  }
+}
+
+// Positive cases: Select one or no rows
+/*
+Java HotSpot(TM) 64-Bit S

[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161652361
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+import scala.util.{Random, Try}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.functions.monotonically_increasing_id
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.{Benchmark, Utils}
+
+
+/**
+ * Benchmark to measure read performance with Filter pushdown.
+ */
+// scalastyle:off line.size.limit
+object FilterPushdownBenchmark {
+  val conf = new SparkConf()
+  conf.set("orc.compression", "snappy")
+  conf.set("spark.sql.parquet.compression.codec", "snappy")
+
+  private val spark = SparkSession.builder()
+.master("local[1]")
+.appName("FilterPushdownBenchmark")
+.config(conf)
+.getOrCreate()
+
+  // Set default configs. Individual cases will change them if necessary.
+  spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true")
+  spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true")
--- End diff --

ah we don't need it, we always set them in the benchmark cases.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161652184
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+import scala.util.{Random, Try}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.functions.monotonically_increasing_id
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.{Benchmark, Utils}
+
+
+/**
+ * Benchmark to measure read performance with Filter pushdown.
+ */
+// scalastyle:off line.size.limit
+object FilterPushdownBenchmark {
+  val conf = new SparkConf()
+  conf.set("orc.compression", "snappy")
+  conf.set("spark.sql.parquet.compression.codec", "snappy")
+
+  private val spark = SparkSession.builder()
+.master("local[1]")
+.appName("FilterPushdownBenchmark")
+.config(conf)
+.getOrCreate()
+
+  // Set default configs. Individual cases will change them if necessary.
+  spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true")
+  spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true")
--- End diff --

I think it's fine, then we don't need to care about what the default value 
is.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-15 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161651566
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+import scala.util.{Random, Try}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.functions.monotonically_increasing_id
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.{Benchmark, Utils}
+
+
+/**
+ * Benchmark to measure read performance with Filter pushdown.
+ */
+// scalastyle:off line.size.limit
+object FilterPushdownBenchmark {
+  val conf = new SparkConf()
+  conf.set("orc.compression", "snappy")
+  conf.set("spark.sql.parquet.compression.codec", "snappy")
+
+  private val spark = SparkSession.builder()
+.master("local[1]")
+.appName("FilterPushdownBenchmark")
+.config(conf)
+.getOrCreate()
+
+  // Set default configs. Individual cases will change them if necessary.
+  spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true")
+  spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true")
+
+  def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+try f finally tableNames.foreach(spark.catalog.dropTempView)
+  }
+
+  def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+val (keys, values) = pairs.unzip
+val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption)
+(keys, values).zipped.foreach(spark.conf.set)
+try f finally {
+  keys.zip(currentValues).foreach {
+case (key, Some(value)) => spark.conf.set(key, value)
+case (key, None) => spark.conf.unset(key)
+  }
+}
+  }
+
+  private def prepareTable(dir: File, df: DataFrame): Unit = {
+val dirORC = dir.getCanonicalPath + "/orc"
+val dirParquet = dir.getCanonicalPath + "/parquet"
+
+df.write.mode("overwrite").orc(dirORC)
+df.write.mode("overwrite").parquet(dirParquet)
+
+spark.read.orc(dirORC).createOrReplaceTempView("orcTable")
+spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable")
+  }
+
+  def filterPushDownBenchmark(values: Int, width: Int, expr: String): Unit 
= {
+val benchmark = new Benchmark(s"Filter Pushdown ($expr)", values)
+
+withTempPath { dir =>
+  withTempTable("t1", "orcTable", "patquetTable") {
+import spark.implicits._
+val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) 
c$i")
+val df = spark.range(values).map(_ => 
Random.nextLong).selectExpr(selectExpr: _*)
+  .withColumn("id", monotonically_increasing_id())
+
+df.createOrReplaceTempView("t1")
+prepareTable(dir, spark.sql("SELECT * FROM t1"))
+
+Seq(false, true).foreach { value =>
+  benchmark.addCase(s"Parquet Vectorized ${if (value) 
s"(Pushdown)" else ""}") { _ =>
+withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> 
s"$value") {
+  spark.sql(s"SELECT * FROM parquetTable WHERE 
$expr").collect()
+}
+  }
+}
+
+Seq(false, true).foreach { value =>
+  benchmark.addCase(s"Native ORC Vectorized ${if (value) 
s"(Pushdown)" else ""}") { _ =>
+withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> 
s"$value") {
+  spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect()
+}
+  }
+}
+
+// Positive cases: Select one or no rows
+/*
+Java HotSpot(TM) 64-Bit 

[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-15 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161651223
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+import scala.util.{Random, Try}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.functions.monotonically_increasing_id
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.{Benchmark, Utils}
+
+
+/**
+ * Benchmark to measure read performance with Filter pushdown.
+ */
+// scalastyle:off line.size.limit
+object FilterPushdownBenchmark {
+  val conf = new SparkConf()
+  conf.set("orc.compression", "snappy")
+  conf.set("spark.sql.parquet.compression.codec", "snappy")
+
+  private val spark = SparkSession.builder()
+.master("local[1]")
+.appName("FilterPushdownBenchmark")
+.config(conf)
+.getOrCreate()
+
+  // Set default configs. Individual cases will change them if necessary.
+  spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true")
+  spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true")
+
+  def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+try f finally tableNames.foreach(spark.catalog.dropTempView)
+  }
+
+  def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+val (keys, values) = pairs.unzip
+val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption)
+(keys, values).zipped.foreach(spark.conf.set)
+try f finally {
+  keys.zip(currentValues).foreach {
+case (key, Some(value)) => spark.conf.set(key, value)
+case (key, None) => spark.conf.unset(key)
+  }
+}
+  }
+
+  private def prepareTable(dir: File, df: DataFrame): Unit = {
+val dirORC = dir.getCanonicalPath + "/orc"
+val dirParquet = dir.getCanonicalPath + "/parquet"
+
+df.write.mode("overwrite").orc(dirORC)
+df.write.mode("overwrite").parquet(dirParquet)
+
+spark.read.orc(dirORC).createOrReplaceTempView("orcTable")
+spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable")
+  }
+
+  def filterPushDownBenchmark(values: Int, width: Int, expr: String): Unit 
= {
+val benchmark = new Benchmark(s"Filter Pushdown ($expr)", values)
+
+withTempPath { dir =>
+  withTempTable("t1", "orcTable", "patquetTable") {
+import spark.implicits._
+val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) 
c$i")
+val df = spark.range(values).map(_ => 
Random.nextLong).selectExpr(selectExpr: _*)
+  .withColumn("id", monotonically_increasing_id())
+
+df.createOrReplaceTempView("t1")
+prepareTable(dir, spark.sql("SELECT * FROM t1"))
+
+Seq(false, true).foreach { value =>
+  benchmark.addCase(s"Parquet Vectorized ${if (value) 
s"(Pushdown)" else ""}") { _ =>
+withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> 
s"$value") {
+  spark.sql(s"SELECT * FROM parquetTable WHERE 
$expr").collect()
+}
+  }
+}
+
+Seq(false, true).foreach { value =>
+  benchmark.addCase(s"Native ORC Vectorized ${if (value) 
s"(Pushdown)" else ""}") { _ =>
+withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> 
s"$value") {
+  spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect()
+}
+  }
+}
+
+// Positive cases: Select one or no rows
+/*
+Java HotSpot(TM) 64-Bit 

[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-15 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161650985
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+import scala.util.{Random, Try}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.functions.monotonically_increasing_id
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.{Benchmark, Utils}
+
+
+/**
+ * Benchmark to measure read performance with Filter pushdown.
+ */
+// scalastyle:off line.size.limit
+object FilterPushdownBenchmark {
+  val conf = new SparkConf()
+  conf.set("orc.compression", "snappy")
+  conf.set("spark.sql.parquet.compression.codec", "snappy")
+
+  private val spark = SparkSession.builder()
+.master("local[1]")
+.appName("FilterPushdownBenchmark")
+.config(conf)
+.getOrCreate()
+
+  // Set default configs. Individual cases will change them if necessary.
+  spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true")
+  spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true")
--- End diff --

Do we need it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-15 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161650904
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+import scala.util.{Random, Try}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.functions.monotonically_increasing_id
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.{Benchmark, Utils}
+
+
+/**
+ * Benchmark to measure read performance with Filter pushdown.
+ */
+// scalastyle:off line.size.limit
+object FilterPushdownBenchmark {
+  val conf = new SparkConf()
+  conf.set("orc.compression", "snappy")
+  conf.set("spark.sql.parquet.compression.codec", "snappy")
+
+  private val spark = SparkSession.builder()
+.master("local[1]")
+.appName("FilterPushdownBenchmark")
+.config(conf)
+.getOrCreate()
+
+  // Set default configs. Individual cases will change them if necessary.
+  spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true")
+  spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true")
+
+  def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+try f finally tableNames.foreach(spark.catalog.dropTempView)
+  }
+
+  def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+val (keys, values) = pairs.unzip
+val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption)
+(keys, values).zipped.foreach(spark.conf.set)
+try f finally {
+  keys.zip(currentValues).foreach {
+case (key, Some(value)) => spark.conf.set(key, value)
+case (key, None) => spark.conf.unset(key)
+  }
+}
+  }
+
+  private def prepareTable(dir: File, df: DataFrame): Unit = {
+val dirORC = dir.getCanonicalPath + "/orc"
+val dirParquet = dir.getCanonicalPath + "/parquet"
+
+df.write.mode("overwrite").orc(dirORC)
+df.write.mode("overwrite").parquet(dirParquet)
+
+spark.read.orc(dirORC).createOrReplaceTempView("orcTable")
+spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable")
+  }
+
+  def filterPushDownBenchmark(values: Int, width: Int, expr: String): Unit 
= {
+val benchmark = new Benchmark(s"Filter Pushdown ($expr)", values)
+
+withTempPath { dir =>
+  withTempTable("t1", "orcTable", "patquetTable") {
+import spark.implicits._
+val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) 
c$i")
+val df = spark.range(values).map(_ => 
Random.nextLong).selectExpr(selectExpr: _*)
+  .withColumn("id", monotonically_increasing_id())
+
+df.createOrReplaceTempView("t1")
+prepareTable(dir, spark.sql("SELECT * FROM t1"))
+
+Seq(false, true).foreach { value =>
--- End diff --

`value` -> `pushDownEnabled`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-15 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161650716
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+import scala.util.{Random, Try}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.functions.monotonically_increasing_id
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.{Benchmark, Utils}
+
+
+/**
+ * Benchmark to measure read performance with Filter pushdown.
+ */
+// scalastyle:off line.size.limit
+object FilterPushdownBenchmark {
+  val conf = new SparkConf()
+  conf.set("orc.compression", "snappy")
+  conf.set("spark.sql.parquet.compression.codec", "snappy")
+
+  private val spark = SparkSession.builder()
+.master("local[1]")
+.appName("FilterPushdownBenchmark")
+.config(conf)
+.getOrCreate()
+
+  // Set default configs. Individual cases will change them if necessary.
+  spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true")
+  spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true")
+
+  def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+try f finally tableNames.foreach(spark.catalog.dropTempView)
+  }
+
+  def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+val (keys, values) = pairs.unzip
+val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption)
+(keys, values).zipped.foreach(spark.conf.set)
+try f finally {
+  keys.zip(currentValues).foreach {
+case (key, Some(value)) => spark.conf.set(key, value)
+case (key, None) => spark.conf.unset(key)
+  }
+}
+  }
+
+  private def prepareTable(dir: File, df: DataFrame): Unit = {
+val dirORC = dir.getCanonicalPath + "/orc"
+val dirParquet = dir.getCanonicalPath + "/parquet"
+
+df.write.mode("overwrite").orc(dirORC)
+df.write.mode("overwrite").parquet(dirParquet)
+
+spark.read.orc(dirORC).createOrReplaceTempView("orcTable")
+spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable")
+  }
+
+  def filterPushDownBenchmark(values: Int, width: Int, expr: String): Unit 
= {
+val benchmark = new Benchmark(s"Filter Pushdown ($expr)", values)
+
+withTempPath { dir =>
+  withTempTable("t1", "orcTable", "patquetTable") {
+import spark.implicits._
+val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) 
c$i")
+val df = spark.range(values).map(_ => 
Random.nextLong).selectExpr(selectExpr: _*)
+  .withColumn("id", monotonically_increasing_id())
+
+df.createOrReplaceTempView("t1")
+prepareTable(dir, spark.sql("SELECT * FROM t1"))
+
+Seq(false, true).foreach { value =>
+  benchmark.addCase(s"Parquet Vectorized ${if (value) 
s"(Pushdown)" else ""}") { _ =>
+withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> 
s"$value") {
+  spark.sql(s"SELECT * FROM parquetTable WHERE 
$expr").collect()
+}
+  }
+}
+
+Seq(false, true).foreach { value =>
+  benchmark.addCase(s"Native ORC Vectorized ${if (value) 
s"(Pushdown)" else ""}") { _ =>
+withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> 
s"$value") {
+  spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect()
+}
+  }
+}
+
+// Positive cases: Select one or no rows
+/*
+Java HotSpot(TM) 64-Bit 

[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-15 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161650635
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+import scala.util.{Random, Try}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.functions.monotonically_increasing_id
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.{Benchmark, Utils}
+
+
+/**
+ * Benchmark to measure read performance with Filter pushdown.
+ */
+// scalastyle:off line.size.limit
+object FilterPushdownBenchmark {
+  val conf = new SparkConf()
+  conf.set("orc.compression", "snappy")
+  conf.set("spark.sql.parquet.compression.codec", "snappy")
+
+  private val spark = SparkSession.builder()
+.master("local[1]")
+.appName("FilterPushdownBenchmark")
+.config(conf)
+.getOrCreate()
+
+  // Set default configs. Individual cases will change them if necessary.
+  spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true")
+  spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true")
+
+  def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+try f finally tableNames.foreach(spark.catalog.dropTempView)
+  }
+
+  def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+val (keys, values) = pairs.unzip
+val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption)
+(keys, values).zipped.foreach(spark.conf.set)
+try f finally {
+  keys.zip(currentValues).foreach {
+case (key, Some(value)) => spark.conf.set(key, value)
+case (key, None) => spark.conf.unset(key)
+  }
+}
+  }
+
+  private def prepareTable(dir: File, df: DataFrame): Unit = {
+val dirORC = dir.getCanonicalPath + "/orc"
+val dirParquet = dir.getCanonicalPath + "/parquet"
+
+df.write.mode("overwrite").orc(dirORC)
+df.write.mode("overwrite").parquet(dirParquet)
+
+spark.read.orc(dirORC).createOrReplaceTempView("orcTable")
+spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable")
+  }
+
+  def filterPushDownBenchmark(values: Int, width: Int, expr: String): Unit 
= {
+val benchmark = new Benchmark(s"Filter Pushdown ($expr)", values)
+
+withTempPath { dir =>
+  withTempTable("t1", "orcTable", "patquetTable") {
+import spark.implicits._
+val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) 
c$i")
+val df = spark.range(values).map(_ => 
Random.nextLong).selectExpr(selectExpr: _*)
+  .withColumn("id", monotonically_increasing_id())
+
+df.createOrReplaceTempView("t1")
+prepareTable(dir, spark.sql("SELECT * FROM t1"))
+
+Seq(false, true).foreach { value =>
+  benchmark.addCase(s"Parquet Vectorized ${if (value) 
s"(Pushdown)" else ""}") { _ =>
+withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> 
s"$value") {
+  spark.sql(s"SELECT * FROM parquetTable WHERE 
$expr").collect()
+}
+  }
+}
+
+Seq(false, true).foreach { value =>
+  benchmark.addCase(s"Native ORC Vectorized ${if (value) 
s"(Pushdown)" else ""}") { _ =>
+withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> 
s"$value") {
+  spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect()
+}
+  }
+}
+
+// Positive cases: Select one or no rows
+/*
+Java HotSpot(TM) 64-Bit 

[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-15 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161650471
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+import scala.util.{Random, Try}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.functions.monotonically_increasing_id
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.{Benchmark, Utils}
+
+
+/**
+ * Benchmark to measure read performance with Filter pushdown.
+ */
+// scalastyle:off line.size.limit
+object FilterPushdownBenchmark {
+  val conf = new SparkConf()
+  conf.set("orc.compression", "snappy")
+  conf.set("spark.sql.parquet.compression.codec", "snappy")
+
+  private val spark = SparkSession.builder()
+.master("local[1]")
+.appName("FilterPushdownBenchmark")
+.config(conf)
+.getOrCreate()
+
+  // Set default configs. Individual cases will change them if necessary.
+  spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true")
+  spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true")
+
+  def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+try f finally tableNames.foreach(spark.catalog.dropTempView)
+  }
+
+  def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+val (keys, values) = pairs.unzip
+val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption)
+(keys, values).zipped.foreach(spark.conf.set)
+try f finally {
+  keys.zip(currentValues).foreach {
+case (key, Some(value)) => spark.conf.set(key, value)
+case (key, None) => spark.conf.unset(key)
+  }
+}
+  }
+
+  private def prepareTable(dir: File, df: DataFrame): Unit = {
+val dirORC = dir.getCanonicalPath + "/orc"
+val dirParquet = dir.getCanonicalPath + "/parquet"
+
+df.write.mode("overwrite").orc(dirORC)
+df.write.mode("overwrite").parquet(dirParquet)
+
+spark.read.orc(dirORC).createOrReplaceTempView("orcTable")
+spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable")
+  }
+
+  def filterPushDownBenchmark(values: Int, width: Int, expr: String): Unit 
= {
+val benchmark = new Benchmark(s"Filter Pushdown ($expr)", values)
+
+withTempPath { dir =>
+  withTempTable("t1", "orcTable", "patquetTable") {
+import spark.implicits._
+val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) 
c$i")
+val df = spark.range(values).map(_ => 
Random.nextLong).selectExpr(selectExpr: _*)
+  .withColumn("id", monotonically_increasing_id())
+
+df.createOrReplaceTempView("t1")
+prepareTable(dir, spark.sql("SELECT * FROM t1"))
+
+Seq(false, true).foreach { value =>
+  benchmark.addCase(s"Parquet Vectorized ${if (value) 
s"(Pushdown)" else ""}") { _ =>
+withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> 
s"$value") {
+  spark.sql(s"SELECT * FROM parquetTable WHERE 
$expr").collect()
+}
+  }
+}
+
+Seq(false, true).foreach { value =>
+  benchmark.addCase(s"Native ORC Vectorized ${if (value) 
s"(Pushdown)" else ""}") { _ =>
+withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> 
s"$value") {
+  spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect()
+}
+  }
+}
+
+// Positive cases: Select one or no rows
+/*
+Java HotSpot(TM) 64-Bit 

[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-15 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161650341
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+import scala.util.{Random, Try}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.functions.monotonically_increasing_id
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.{Benchmark, Utils}
+
+
+/**
+ * Benchmark to measure read performance with Filter pushdown.
+ */
+// scalastyle:off line.size.limit
+object FilterPushdownBenchmark {
+  val conf = new SparkConf()
+  conf.set("orc.compression", "snappy")
+  conf.set("spark.sql.parquet.compression.codec", "snappy")
+
+  private val spark = SparkSession.builder()
+.master("local[1]")
+.appName("FilterPushdownBenchmark")
+.config(conf)
+.getOrCreate()
+
+  // Set default configs. Individual cases will change them if necessary.
+  spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true")
+  spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true")
+
+  def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+try f finally tableNames.foreach(spark.catalog.dropTempView)
+  }
+
+  def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+val (keys, values) = pairs.unzip
+val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption)
+(keys, values).zipped.foreach(spark.conf.set)
+try f finally {
+  keys.zip(currentValues).foreach {
+case (key, Some(value)) => spark.conf.set(key, value)
+case (key, None) => spark.conf.unset(key)
+  }
+}
+  }
+
+  private def prepareTable(dir: File, df: DataFrame): Unit = {
+val dirORC = dir.getCanonicalPath + "/orc"
+val dirParquet = dir.getCanonicalPath + "/parquet"
+
+df.write.mode("overwrite").orc(dirORC)
+df.write.mode("overwrite").parquet(dirParquet)
+
+spark.read.orc(dirORC).createOrReplaceTempView("orcTable")
+spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable")
+  }
+
+  def filterPushDownBenchmark(values: Int, width: Int, expr: String): Unit 
= {
+val benchmark = new Benchmark(s"Filter Pushdown ($expr)", values)
+
+withTempPath { dir =>
+  withTempTable("t1", "orcTable", "patquetTable") {
+import spark.implicits._
+val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) 
c$i")
+val df = spark.range(values).map(_ => 
Random.nextLong).selectExpr(selectExpr: _*)
+  .withColumn("id", monotonically_increasing_id())
+
+df.createOrReplaceTempView("t1")
+prepareTable(dir, spark.sql("SELECT * FROM t1"))
+
+Seq(false, true).foreach { value =>
+  benchmark.addCase(s"Parquet Vectorized ${if (value) 
s"(Pushdown)" else ""}") { _ =>
+withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> 
s"$value") {
+  spark.sql(s"SELECT * FROM parquetTable WHERE 
$expr").collect()
+}
+  }
+}
+
+Seq(false, true).foreach { value =>
+  benchmark.addCase(s"Native ORC Vectorized ${if (value) 
s"(Pushdown)" else ""}") { _ =>
+withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> 
s"$value") {
+  spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect()
+}
+  }
+}
+
+// Positive cases: Select one or no rows
+/*
+Java HotSpot(TM) 64-Bit 

[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-15 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161637473
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+import scala.util.{Random, Try}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.functions.monotonically_increasing_id
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.{Benchmark, Utils}
+
+
+/**
+ * Benchmark to measure read performance with Filter pushdown.
+ */
+// scalastyle:off line.size.limit
+object FilterPushdownBenchmark {
+  val conf = new SparkConf()
+  conf.set("orc.compression", "snappy")
+  conf.set("spark.sql.parquet.compression.codec", "snappy")
+
+  private val spark = SparkSession.builder()
+.master("local[1]")
+.appName("FilterPushdownBenchmark")
+.config(conf)
+.getOrCreate()
+
+  // Set default configs. Individual cases will change them if necessary.
+  spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true")
+  spark.conf.set(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key, "true")
+
+  def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+try f finally tableNames.foreach(spark.catalog.dropTempView)
+  }
+
+  def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+val (keys, values) = pairs.unzip
+val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption)
+(keys, values).zipped.foreach(spark.conf.set)
+try f finally {
+  keys.zip(currentValues).foreach {
+case (key, Some(value)) => spark.conf.set(key, value)
+case (key, None) => spark.conf.unset(key)
+  }
+}
+  }
+
+  private def prepareTable(dir: File, df: DataFrame): Unit = {
+val dirORC = dir.getCanonicalPath + "/orc"
+val dirParquet = dir.getCanonicalPath + "/parquet"
+
+df.write.mode("overwrite").orc(dirORC)
+df.write.mode("overwrite").parquet(dirParquet)
+
+spark.read.orc(dirORC).createOrReplaceTempView("orcTable")
+spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable")
+  }
+
+  def filterPushDownBenchmark(values: Int, width: Int, expr: String): Unit 
= {
+val benchmark = new Benchmark(s"Filter Pushdown ($expr)", values)
+
+withTempPath { dir =>
+  withTempTable("t1", "orcTable", "patquetTable") {
+import spark.implicits._
+val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) 
c$i")
+val df = spark.range(values).map(_ => 
Random.nextLong).selectExpr(selectExpr: _*)
+  .withColumn("id", monotonically_increasing_id())
+
+df.createOrReplaceTempView("t1")
+prepareTable(dir, spark.sql("SELECT * FROM t1"))
+
+Seq(false, true).foreach { value =>
+  benchmark.addCase(s"Parquet Vectorized ${if (value) 
s"(Pushdown)" else ""}") { _ =>
+withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> 
s"$value") {
+  spark.sql(s"SELECT * FROM parquetTable WHERE 
$expr").collect()
+}
+  }
+}
+
+Seq(false, true).foreach { value =>
+  benchmark.addCase(s"Native ORC Vectorized ${if (value) 
s"(Pushdown)" else ""}") { _ =>
+withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> 
s"$value") {
+  spark.sql(s"SELECT * FROM orcTable WHERE $expr").collect()
+}
+  }
+}
+
+// Positive cases: Select one or no rows
+/*
+Java HotSpot(TM) 64-B

[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-15 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161620557
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala ---
@@ -483,6 +484,64 @@ object OrcReadBenchmark {
 }
   }
 
+  def filterPushDownBenchmark(values: Int, width: Int): Unit = {
+val benchmark = new Benchmark(s"Filter Pushdown", values)
+
+withTempPath { dir =>
+  withTempTable("t1", "nativeOrcTable", "hiveOrcTable") {
+import spark.implicits._
+val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) 
c$i")
+val whereExpr = (1 to width).map(i => s"NOT c$i LIKE 
'%not%exist%'").mkString(" AND ")
+val df = spark.range(values).map(_ => 
Random.nextLong).selectExpr(selectExpr: _*)
+  .withColumn("uniqueID", monotonically_increasing_id())
+
+df.createOrReplaceTempView("t1")
+prepareTable(dir, spark.sql("SELECT * FROM t1"))
+
+Seq(false, true).foreach { value =>
+  benchmark.addCase(s"Native ORC MR (Pushdown=$value)") { _ =>
+withSQLConf(
+  SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false",
+  SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> s"$value") {
+  spark.sql(
+s"""
+   |SELECT c1
+   |FROM nativeOrcTable
+   |WHERE uniqueID = 0 AND $whereExpr
+ """.stripMargin).collect()
+}
+  }
+}
+
+Seq(false, true).foreach { value =>
+  benchmark.addCase(s"Native ORC Vectorized (Pushdown=$value)") { 
_ =>
+withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> 
s"$value") {
+  spark.sql(
+s"""
+   |SELECT c1
+   |FROM nativeOrcTable
+   |WHERE uniqueID = 0 AND $whereExpr
+ """.stripMargin).collect()
+}
+  }
+}
+
+/*
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2
+Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
+
+Filter Pushdown: Best/Avg Time(ms)
Rate(M/s)   Per Row(ns)   Relative
+

+Native ORC MR (Pushdown=false)  16169 / 16193  
0.33084.0   1.0X
--- End diff --

Yep. I see. Focusing on PPD on the best reader.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-15 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161619856
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala ---
@@ -483,6 +484,64 @@ object OrcReadBenchmark {
 }
   }
 
+  def filterPushDownBenchmark(values: Int, width: Int): Unit = {
--- End diff --

Ur, do you expect there will be much difference in some cases?
In general `most common cases`, it will be slightly slower as we can expect 
easily.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-15 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161619207
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala ---
@@ -483,6 +484,64 @@ object OrcReadBenchmark {
 }
   }
 
+  def filterPushDownBenchmark(values: Int, width: Int): Unit = {
+val benchmark = new Benchmark(s"Filter Pushdown", values)
+
+withTempPath { dir =>
+  withTempTable("t1", "nativeOrcTable", "hiveOrcTable") {
+import spark.implicits._
+val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) 
c$i")
+val whereExpr = (1 to width).map(i => s"NOT c$i LIKE 
'%not%exist%'").mkString(" AND ")
--- End diff --

I see. @cloud-fan and @gatorsmile .
For the most common cases, I also wondered that for Parquet, too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-15 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161546141
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala ---
@@ -483,6 +484,64 @@ object OrcReadBenchmark {
 }
   }
 
+  def filterPushDownBenchmark(values: Int, width: Int): Unit = {
--- End diff --

Could you add a test case for useless predicates too?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-15 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161545909
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala ---
@@ -483,6 +484,64 @@ object OrcReadBenchmark {
 }
   }
 
+  def filterPushDownBenchmark(values: Int, width: Int): Unit = {
+val benchmark = new Benchmark(s"Filter Pushdown", values)
+
+withTempPath { dir =>
+  withTempTable("t1", "nativeOrcTable", "hiveOrcTable") {
+import spark.implicits._
+val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) 
c$i")
+val whereExpr = (1 to width).map(i => s"NOT c$i LIKE 
'%not%exist%'").mkString(" AND ")
--- End diff --

The goal of this benchmark is not to show the best case of PPD. We just 
want to see the perf difference of the most common cases. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161431971
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala ---
@@ -483,6 +484,64 @@ object OrcReadBenchmark {
 }
   }
 
+  def filterPushDownBenchmark(values: Int, width: Int): Unit = {
+val benchmark = new Benchmark(s"Filter Pushdown", values)
+
+withTempPath { dir =>
+  withTempTable("t1", "nativeOrcTable", "hiveOrcTable") {
+import spark.implicits._
+val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) 
c$i")
+val whereExpr = (1 to width).map(i => s"NOT c$i LIKE 
'%not%exist%'").mkString(" AND ")
--- End diff --

oh sorry I missed the `uniqueID` part. So the `like` operation is just to 
make the difference larger? We don't need to do this, just a simple predicate 
like `col = 1` or `col < 1`,  to show normally how much PPD improves 
performance.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-14 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161426568
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala ---
@@ -483,6 +484,64 @@ object OrcReadBenchmark {
 }
   }
 
+  def filterPushDownBenchmark(values: Int, width: Int): Unit = {
+val benchmark = new Benchmark(s"Filter Pushdown", values)
+
+withTempPath { dir =>
+  withTempTable("t1", "nativeOrcTable", "hiveOrcTable") {
+import spark.implicits._
+val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) 
c$i")
+val whereExpr = (1 to width).map(i => s"NOT c$i LIKE 
'%not%exist%'").mkString(" AND ")
--- End diff --

I mean `LIKE '%not%exist%'` will not be optimized by `LikeSimplification`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-14 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161425712
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala ---
@@ -483,6 +484,64 @@ object OrcReadBenchmark {
 }
   }
 
+  def filterPushDownBenchmark(values: Int, width: Int): Unit = {
+val benchmark = new Benchmark(s"Filter Pushdown", values)
+
+withTempPath { dir =>
+  withTempTable("t1", "nativeOrcTable", "hiveOrcTable") {
+import spark.implicits._
+val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) 
c$i")
+val whereExpr = (1 to width).map(i => s"NOT c$i LIKE 
'%not%exist%'").mkString(" AND ")
--- End diff --

Ur, @cloud-fan and @gatorsmile .

The best case for PPD is **Spark needs to do lots of processing on the 
returned rows but ORC reader only returns one stripe with minimal CPU code**.

So, I designed this benchmark in order to show the difference clearly.
1. The push-downed predicate is only `uniqueID = 0` (minimal). We can 
change that into `uniqueID ==` or `uniqueID >`.
2. `LIKE` predicate is chosed because it's not pushed down and makes Spark 
do more processing. It's just one of the example of that kind of operation. You 
can ignore thoses predicates. We can choose some UDFs instead.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161421828
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala ---
@@ -483,6 +484,64 @@ object OrcReadBenchmark {
 }
   }
 
+  def filterPushDownBenchmark(values: Int, width: Int): Unit = {
+val benchmark = new Benchmark(s"Filter Pushdown", values)
+
+withTempPath { dir =>
+  withTempTable("t1", "nativeOrcTable", "hiveOrcTable") {
+import spark.implicits._
+val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) 
c$i")
+val whereExpr = (1 to width).map(i => s"NOT c$i LIKE 
'%not%exist%'").mkString(" AND ")
+val df = spark.range(values).map(_ => 
Random.nextLong).selectExpr(selectExpr: _*)
+  .withColumn("uniqueID", monotonically_increasing_id())
+
+df.createOrReplaceTempView("t1")
+prepareTable(dir, spark.sql("SELECT * FROM t1"))
+
+Seq(false, true).foreach { value =>
+  benchmark.addCase(s"Native ORC MR (Pushdown=$value)") { _ =>
+withSQLConf(
+  SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false",
+  SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> s"$value") {
+  spark.sql(
+s"""
+   |SELECT c1
+   |FROM nativeOrcTable
+   |WHERE uniqueID = 0 AND $whereExpr
+ """.stripMargin).collect()
+}
+  }
+}
+
+Seq(false, true).foreach { value =>
+  benchmark.addCase(s"Native ORC Vectorized (Pushdown=$value)") { 
_ =>
+withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> 
s"$value") {
+  spark.sql(
+s"""
+   |SELECT c1
+   |FROM nativeOrcTable
+   |WHERE uniqueID = 0 AND $whereExpr
+ """.stripMargin).collect()
+}
+  }
+}
+
+/*
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2
+Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
+
+Filter Pushdown: Best/Avg Time(ms)
Rate(M/s)   Per Row(ns)   Relative
+

+Native ORC MR (Pushdown=false)  16169 / 16193  
0.33084.0   1.0X
--- End diff --

let's focus on PPD for this benchmark and not disable vectorized reader. 
e.g.
```
col LIKE '%not%exist%'
col LIKE '%not%exist%' (Pushdown)
col = 3
col = 3 (Pushdown)
...
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161421703
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala ---
@@ -483,6 +484,64 @@ object OrcReadBenchmark {
 }
   }
 
+  def filterPushDownBenchmark(values: Int, width: Int): Unit = {
+val benchmark = new Benchmark(s"Filter Pushdown", values)
+
+withTempPath { dir =>
+  withTempTable("t1", "nativeOrcTable", "hiveOrcTable") {
+import spark.implicits._
+val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) 
c$i")
+val whereExpr = (1 to width).map(i => s"NOT c$i LIKE 
'%not%exist%'").mkString(" AND ")
--- End diff --

This is kind of the best case for PPD, as the data is sorted. I'm fine with 
it, but let's add some more cases, at least `==` and `>`. We should follow 
other benchmarks in this file to make it completed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-14 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161406817
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala ---
@@ -483,6 +484,64 @@ object OrcReadBenchmark {
 }
   }
 
+  def filterPushDownBenchmark(values: Int, width: Int): Unit = {
+val benchmark = new Benchmark(s"Filter Pushdown", values)
+
+withTempPath { dir =>
+  withTempTable("t1", "nativeOrcTable", "hiveOrcTable") {
+import spark.implicits._
+val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) 
c$i")
+val whereExpr = (1 to width).map(i => s"NOT c$i LIKE 
'%not%exist%'").mkString(" AND ")
--- End diff --

Is it important for this config PR?
Let’s focus on the original purpose of this PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-14 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161406728
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala ---
@@ -483,6 +484,64 @@ object OrcReadBenchmark {
 }
   }
 
+  def filterPushDownBenchmark(values: Int, width: Int): Unit = {
--- End diff --

Theoretically, useless predicates (selectivity 100%) only adds additional 
computation for both Parquet/ORC.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-14 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161405679
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala ---
@@ -483,6 +484,64 @@ object OrcReadBenchmark {
 }
   }
 
+  def filterPushDownBenchmark(values: Int, width: Int): Unit = {
--- End diff --

Have you seen any workload that predicate pushdown could be slower?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-14 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161405630
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala ---
@@ -483,6 +484,64 @@ object OrcReadBenchmark {
 }
   }
 
+  def filterPushDownBenchmark(values: Int, width: Int): Unit = {
+val benchmark = new Benchmark(s"Filter Pushdown", values)
+
+withTempPath { dir =>
+  withTempTable("t1", "nativeOrcTable", "hiveOrcTable") {
+import spark.implicits._
+val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) 
c$i")
+val whereExpr = (1 to width).map(i => s"NOT c$i LIKE 
'%not%exist%'").mkString(" AND ")
--- End diff --

Why not using some simple predicate? Like `col > 5`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-14 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/20265#discussion_r161403798
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala ---
@@ -483,6 +484,64 @@ object OrcReadBenchmark {
 }
   }
 
+  def filterPushDownBenchmark(values: Int, width: Int): Unit = {
--- End diff --

Filter push-down depends on various properties of data and predicates. This 
is an example of filter push down performance in order to show some benefits.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20265: [SPARK-21783][SQL] Turn on ORC filter push-down b...

2018-01-14 Thread dongjoon-hyun
GitHub user dongjoon-hyun opened a pull request:

https://github.com/apache/spark/pull/20265

[SPARK-21783][SQL] Turn on ORC filter push-down by default

## What changes were proposed in this pull request?

ORC filter push-down is disabled by default from the beginning, SPARK-2883

Now, Apache Spark starts to depend on Apache ORC 1.4.1. For Apache Spark 
2.3, this PR turns on ORC filter push-down by default like Parquet (SPARK-9207) 
as a part of SPARK-20901, "Feature parity for ORC with Parquet".

## How was this patch tested?

Pass the existing tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dongjoon-hyun/spark SPARK-21783

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20265.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20265


commit dda5bdf6865018613eeb98c6acbdd39ab2459c87
Author: Dongjoon Hyun 
Date:   2018-01-14T08:20:11Z

[SPARK-21783][SQL] Turn on ORC filter push-down by default




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org