Oh yes, thanks for adding that using sc.hadoopConfiguration.set also works :-)
On Wed, Jan 21, 2015 at 7:11 AM, Yana Kadiyska <yana.kadiy...@gmail.com> wrote: > Thanks for looking Cheng. Just to clarify in case other people need this > sooner, setting sc.hadoopConfiguration.set("parquet.task.side.metadata"," > false")did work well in terms of dropping rowgroups/showing small input > size. What was odd about that is that the overall time wasn't much > better...but maybe that was overhead from sending the metadata clientside. > > Thanks again and looking forward to your fix > > On Tue, Jan 20, 2015 at 9:07 PM, Cheng Lian <lian.cs....@gmail.com> wrote: > >> Hey Yana, >> >> Sorry for the late reply, missed this important thread somehow. And many >> thanks for reporting this. It turned out to be a bug — filter pushdown is >> only enabled when using client side metadata, which is not expected, >> because task side metadata code path is more performant. And I guess that >> the reason why setting parquet.task.side.metadata to false didn’t reduce >> input size for you is because you set the configuration with Spark API, or >> put it into spark-defaults.conf. This configuration goes to Hadoop >> Configuration, and Spark only merge properties whose names start with >> spark.hadoop into Hadoop Configuration instances. You may try to put >> parquet.task.side.metadata config into Hadoop core-site.xml, and then >> re-run the query. I can see significant differences by doing so. >> >> I’ll open a JIRA and deliver a fix for this ASAP. Thanks again for >> reporting all the details! >> >> Cheng >> >> On 1/13/15 12:56 PM, Yana Kadiyska wrote: >> >> Attempting to bump this up in case someone can help out after all. I >> spent a few good hours stepping through the code today, so I'll summarize >> my observations both in hope I get some help and to help others that might >> be looking into this: >> >> 1. I am setting *spark.sql.parquet.**filterPushdown=true* >> 2. I can see by stepping through the driver debugger that >> PaquetTableOperations.execute sets the filters via >> ParquetInputFormat.setFilterPredicate (I checked the conf object, things >> appear OK there) >> 3. In FilteringParquetRowInputFormat, I get through the codepath for >> getTaskSideSplits. It seems that the codepath for getClientSideSplits would >> try to drop rowGroups but I don't see similar in getTaskSideSplit. >> >> Does anyone have pointers on where to look after this? Where is >> rowgroup filtering happening in the case of getTaskSideSplits? I can attach >> to the executor but am not quite sure what code related to Parquet gets >> called executor side...also don't see any messages in the executor logs >> related to Filtering predicates. >> >> For comparison, I went through the getClientSideSplits and can see that >> predicate pushdown works OK: >> >> >> sc.hadoopConfiguration.set("parquet.task.side.metadata","false") >> >> 15/01/13 20:04:49 INFO FilteringParquetRowInputFormat: Using Client Side >> Metadata Split Strategy >> 15/01/13 20:05:13 INFO FilterCompat: Filtering using predicate: eq(epoch, >> 1417384800) >> 15/01/13 20:06:45 INFO FilteringParquetRowInputFormat: Dropping 572 row >> groups that do not pass filter predicate (28 %) ! >> >> >> >> Is it possible that this is just a UI bug? I can see Input=4G when >> using ("parquet.task.side.metadata","false") and Input=140G when using >> ("parquet.task.side.metadata","true") but the runtimes are very comparable? >> >> [image: Inline image 1] >> >> >> JobId 4 is the ClientSide split, JobId 5 is the TaskSide split. >> >> >> >> On Fri, Jan 9, 2015 at 2:56 PM, Yana Kadiyska <yana.kadiy...@gmail.com> >> wrote: >> >>> I am running the following (connecting to an external Hive Metastore) >>> >>> /a/shark/spark/bin/spark-shell --master spark://ip:7077 --conf >>> *spark.sql.parquet.filterPushdown=true* >>> >>> val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) >>> >>> and then ran two queries: >>> >>> sqlContext.sql("select count(*) from table where partition='blah' ") >>> andsqlContext.sql("select count(*) from table where partition='blah' and >>> epoch=1415561604") >>> >>> >>> >>> According to the Input tab in the UI both scan about 140G of data >>> which is the size of my whole partition. So I have two questions -- >>> >>> 1. is there a way to tell from the plan if a predicate pushdown is >>> supposed to happen? >>> I see this for the second query >>> >>> res0: org.apache.spark.sql.SchemaRDD = >>> SchemaRDD[0] at RDD at SchemaRDD.scala:108 >>> == Query Plan == >>> == Physical Plan == >>> Aggregate false, [], [Coalesce(SUM(PartialCount#49L),0) AS _c0#0L] >>> Exchange SinglePartition >>> Aggregate true, [], [COUNT(1) AS PartialCount#49L] >>> OutputFaker [] >>> Project [] >>> ParquetTableScan [epoch#139L], (ParquetRelation <list of hdfs files> >>> >>> >>> 2. am I doing something obviously wrong that this is not working? (Im >>> guessing it's not woring because the input size for the second query shows >>> unchanged and the execution time is almost 2x as long) >>> >>> thanks in advance for any insights >>> >>> >> >> > >