[jira] [Commented] (SPARK-30162) Filter is not being pushed down for Parquet files
[ https://issues.apache.org/jira/browse/SPARK-30162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16991573#comment-16991573 ] Nasir Ali commented on SPARK-30162: --- [~aman_omer] added in my question > Filter is not being pushed down for Parquet files > - > > Key: SPARK-30162 > URL: https://issues.apache.org/jira/browse/SPARK-30162 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0 > Environment: pyspark 3.0 preview > Ubuntu/Centos > pyarrow 0.14.1 >Reporter: Nasir Ali >Priority: Major > > Filters are not pushed down in Spark 3.0 preview. Also the output of > "explain" method is different. It is hard to debug in 3.0 whether filters > were pushed down or not. Below code could reproduce the bug: > > {code:java} > // code placeholder > df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"), > ("usr1",13.00, "2018-03-11T12:27:18+00:00"), > ("usr1",25.00, "2018-03-12T11:27:18+00:00"), > ("usr1",20.00, "2018-03-13T15:27:18+00:00"), > ("usr1",17.00, "2018-03-14T12:27:18+00:00"), > ("usr2",99.00, "2018-03-15T11:27:18+00:00"), > ("usr2",156.00, "2018-03-22T11:27:18+00:00"), > ("usr2",17.00, "2018-03-31T11:27:18+00:00"), > ("usr2",25.00, "2018-03-15T11:27:18+00:00"), > ("usr2",25.00, "2018-03-16T11:27:18+00:00") > ], >["user","id", "ts"]) > df = df.withColumn('ts', df.ts.cast('timestamp')) > df.write.partitionBy("user").parquet("/home/cnali/data/")df2 = > spark.read.load("/home/cnali/data/")df2.filter("user=='usr2'").explain(True) > {code} > {code:java} > // Spark 2.4 output > == Parsed Logical Plan == > 'Filter ('user = usr2) > +- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan == > id: double, ts: timestamp, user: string > Filter (user#40 = usr2) > +- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan == > Filter (isnotnull(user#40) && (user#40 = usr2)) > +- Relation[id#38,ts#39,user#40] parquet== Physical Plan == > *(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, > Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, > PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [], > ReadSchema: struct{code} > {code:java} > // Spark 3.0.0-preview output > == Parsed Logical Plan == > 'Filter ('user = usr2) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed > Logical Plan == > id: double, ts: timestamp, user: string > Filter (user#2 = usr2) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized > Logical Plan == > Filter (isnotnull(user#2) AND (user#2 = usr2)) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical > Plan == > *(1) Project [id#0, ts#1, user#2] > +- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2)) >+- *(1) ColumnarToRow > +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: > InMemoryFileIndex[file:/home/cnali/data], ReadSchema: > struct > {code} > I have tested it on much larger dataset. Spark 3.0 tries to load whole data > and then apply filter. Whereas Spark 2.4 push down the filter. Above output > shows that Spark 2.4 applied partition filter but not the Spark 3.0 preview. > > Minor: in Spark 3.0 "explain()" output is truncated (maybe fixed length?) and > it's hard to debug. spark.sql.orc.cache.stripe.details.size=1 doesn't > work. > > {code:java} > // pyspark 3 shell output > $ pyspark > Python 3.6.8 (default, Aug 7 2019, 17:28:10) > [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux > Type "help", "copyright", "credits" or "license" for more information. > Warning: Ignoring non-spark config property: > java.io.dir=/md2k/data1,/md2k/data2,/md2k/data3,/md2k/data4,/md2k/data5,/md2k/data6,/md2k/data7,/md2k/data8 > 19/12/09 07:05:36 WARN NativeCodeLoader: Unable to load native-hadoop library > for your platform... using builtin-java classes where applicable > Using Spark's default log4j profile: > org/apache/spark/log4j-defaults.properties > Setting default log level to "WARN". > To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use > setLogLevel(newLevel). > 19/12/09 07:05:36 WARN SparkConf: Note that spark.local.dir will be > overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in > mesos/standalone/kubernetes and LOCAL_DIRS in YARN). > Welcome to > __ > / __/__ ___ _/ /__ > _\ \/ _ \/ _ `/ __/ '_/ >/__ / .__/\_,_/_/ /_/\_\ version 3.0.0-preview > /_/Using
[jira] [Commented] (SPARK-30162) Filter is not being pushed down for Parquet files
[ https://issues.apache.org/jira/browse/SPARK-30162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16991137#comment-16991137 ] Aman Omer commented on SPARK-30162: --- Kindly share the results of spark-shell. Thanks > Filter is not being pushed down for Parquet files > - > > Key: SPARK-30162 > URL: https://issues.apache.org/jira/browse/SPARK-30162 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0 > Environment: pyspark 3.0 preview > Ubuntu/Centos > pyarrow 0.14.1 >Reporter: Nasir Ali >Priority: Major > > Filters are not pushed down in Spark 3.0 preview. Also the output of > "explain" method is different. It is hard to debug in 3.0 whether filters > were pushed down or not. Below code could reproduce the bug: > > {code:java} > // code placeholder > df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"), > ("usr1",13.00, "2018-03-11T12:27:18+00:00"), > ("usr1",25.00, "2018-03-12T11:27:18+00:00"), > ("usr1",20.00, "2018-03-13T15:27:18+00:00"), > ("usr1",17.00, "2018-03-14T12:27:18+00:00"), > ("usr2",99.00, "2018-03-15T11:27:18+00:00"), > ("usr2",156.00, "2018-03-22T11:27:18+00:00"), > ("usr2",17.00, "2018-03-31T11:27:18+00:00"), > ("usr2",25.00, "2018-03-15T11:27:18+00:00"), > ("usr2",25.00, "2018-03-16T11:27:18+00:00") > ], >["user","id", "ts"]) > df = df.withColumn('ts', df.ts.cast('timestamp')) > df.write.partitionBy("user").parquet("/home/cnali/data/")df2 = > spark.read.load("/home/cnali/data/")df2.filter("user=='usr2'").explain(True) > {code} > {code:java} > // Spark 2.4 output > == Parsed Logical Plan == > 'Filter ('user = usr2) > +- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan == > id: double, ts: timestamp, user: string > Filter (user#40 = usr2) > +- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan == > Filter (isnotnull(user#40) && (user#40 = usr2)) > +- Relation[id#38,ts#39,user#40] parquet== Physical Plan == > *(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, > Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, > PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [], > ReadSchema: struct{code} > {code:java} > // Spark 3.0.0-preview output > == Parsed Logical Plan == > 'Filter ('user = usr2) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed > Logical Plan == > id: double, ts: timestamp, user: string > Filter (user#2 = usr2) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized > Logical Plan == > Filter (isnotnull(user#2) AND (user#2 = usr2)) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical > Plan == > *(1) Project [id#0, ts#1, user#2] > +- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2)) >+- *(1) ColumnarToRow > +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: > InMemoryFileIndex[file:/home/cnali/data], ReadSchema: > struct > {code} > I have tested it on much larger dataset. Spark 3.0 tries to load whole data > and then apply filter. Whereas Spark 2.4 push down the filter. Above output > shows that Spark 2.4 applied partition filter but not the Spark 3.0 preview. > > Minor: in Spark 3.0 "explain()" output is truncated (maybe fixed length?) and > it's hard to debug. spark.sql.orc.cache.stripe.details.size=1 doesn't > work. > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-30162) Filter is not being pushed down for Parquet files
[ https://issues.apache.org/jira/browse/SPARK-30162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16990741#comment-16990741 ] Nasir Ali commented on SPARK-30162: --- No, I only use pyspark shell. > Filter is not being pushed down for Parquet files > - > > Key: SPARK-30162 > URL: https://issues.apache.org/jira/browse/SPARK-30162 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0 > Environment: pyspark 3.0 preview > Ubuntu/Centos > pyarrow 0.14.1 >Reporter: Nasir Ali >Priority: Major > > Filters are not pushed down in Spark 3.0 preview. Also the output of > "explain" method is different. It is hard to debug in 3.0 whether filters > were pushed down or not. Below code could reproduce the bug: > > {code:java} > // code placeholder > df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"), > ("usr1",13.00, "2018-03-11T12:27:18+00:00"), > ("usr1",25.00, "2018-03-12T11:27:18+00:00"), > ("usr1",20.00, "2018-03-13T15:27:18+00:00"), > ("usr1",17.00, "2018-03-14T12:27:18+00:00"), > ("usr2",99.00, "2018-03-15T11:27:18+00:00"), > ("usr2",156.00, "2018-03-22T11:27:18+00:00"), > ("usr2",17.00, "2018-03-31T11:27:18+00:00"), > ("usr2",25.00, "2018-03-15T11:27:18+00:00"), > ("usr2",25.00, "2018-03-16T11:27:18+00:00") > ], >["user","id", "ts"]) > df = df.withColumn('ts', df.ts.cast('timestamp')) > df.write.partitionBy("user").parquet("/home/cnali/data/")df2 = > spark.read.load("/home/cnali/data/")df2.filter("user=='usr2'").explain(True) > {code} > {code:java} > // Spark 2.4 output > == Parsed Logical Plan == > 'Filter ('user = usr2) > +- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan == > id: double, ts: timestamp, user: string > Filter (user#40 = usr2) > +- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan == > Filter (isnotnull(user#40) && (user#40 = usr2)) > +- Relation[id#38,ts#39,user#40] parquet== Physical Plan == > *(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, > Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, > PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [], > ReadSchema: struct{code} > {code:java} > // Spark 3.0.0-preview output > == Parsed Logical Plan == > 'Filter ('user = usr2) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed > Logical Plan == > id: double, ts: timestamp, user: string > Filter (user#2 = usr2) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized > Logical Plan == > Filter (isnotnull(user#2) AND (user#2 = usr2)) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical > Plan == > *(1) Project [id#0, ts#1, user#2] > +- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2)) >+- *(1) ColumnarToRow > +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: > InMemoryFileIndex[file:/home/cnali/data], ReadSchema: > struct > {code} > I have tested it on much larger dataset. Spark 3.0 tries to load whole data > and then apply filter. Whereas Spark 2.4 push down the filter. Above output > shows that Spark 2.4 applied partition filter but not the Spark 3.0 preview. > > Minor: in Spark 3.0 "explain()" output is truncated (maybe fixed length?) and > it's hard to debug. spark.sql.orc.cache.stripe.details.size=1 doesn't > work. > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-30162) Filter is not being pushed down for Parquet files
[ https://issues.apache.org/jira/browse/SPARK-30162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16990725#comment-16990725 ] t oo commented on SPARK-30162: -- did u try on scala sparkshell? > Filter is not being pushed down for Parquet files > - > > Key: SPARK-30162 > URL: https://issues.apache.org/jira/browse/SPARK-30162 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0 > Environment: pyspark 3.0 preview > Ubuntu/Centos > pyarrow 0.14.1 >Reporter: Nasir Ali >Priority: Major > > Filters are not pushed down in Spark 3.0 preview. Also the output of > "explain" method is different. It is hard to debug in 3.0 whether filters > were pushed down or not. Below code could reproduce the bug: > > {code:java} > // code placeholder > df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"), > ("usr1",13.00, "2018-03-11T12:27:18+00:00"), > ("usr1",25.00, "2018-03-12T11:27:18+00:00"), > ("usr1",20.00, "2018-03-13T15:27:18+00:00"), > ("usr1",17.00, "2018-03-14T12:27:18+00:00"), > ("usr2",99.00, "2018-03-15T11:27:18+00:00"), > ("usr2",156.00, "2018-03-22T11:27:18+00:00"), > ("usr2",17.00, "2018-03-31T11:27:18+00:00"), > ("usr2",25.00, "2018-03-15T11:27:18+00:00"), > ("usr2",25.00, "2018-03-16T11:27:18+00:00") > ], >["user","id", "ts"]) > df = df.withColumn('ts', df.ts.cast('timestamp')) > df.write.partitionBy("user").parquet("/home/cnali/data/")df2 = > spark.read.load("/home/cnali/data/")df2.filter("user=='usr2'").explain(True) > {code} > {code:java} > // Spark 2.4 output > == Parsed Logical Plan == > 'Filter ('user = usr2) > +- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan == > id: double, ts: timestamp, user: string > Filter (user#40 = usr2) > +- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan == > Filter (isnotnull(user#40) && (user#40 = usr2)) > +- Relation[id#38,ts#39,user#40] parquet== Physical Plan == > *(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, > Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, > PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [], > ReadSchema: struct{code} > {code:java} > // Spark 3.0.0-preview output > == Parsed Logical Plan == > 'Filter ('user = usr2) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed > Logical Plan == > id: double, ts: timestamp, user: string > Filter (user#2 = usr2) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized > Logical Plan == > Filter (isnotnull(user#2) AND (user#2 = usr2)) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical > Plan == > *(1) Project [id#0, ts#1, user#2] > +- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2)) >+- *(1) ColumnarToRow > +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: > InMemoryFileIndex[file:/home/cnali/data], ReadSchema: > struct > {code} > I have tested it on much larger dataset. Spark 3.0 tries to load whole data > and then apply filter. Whereas Spark 2.4 push down the filter. Above output > shows that Spark 2.4 applied partition filter but not the Spark 3.0 preview. > > Minor: in Spark 3.0 "explain()" output is truncated (maybe fixed length?) and > it's hard to debug. spark.sql.orc.cache.stripe.details.size=1 doesn't > work. > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org