Oh yes, thanks for adding that using sc.hadoopConfiguration.set also works
:-)
​

On Wed, Jan 21, 2015 at 7:11 AM, Yana Kadiyska <yana.kadiy...@gmail.com>
wrote:

> Thanks for looking Cheng. Just to clarify in case other people need this
> sooner, setting sc.hadoopConfiguration.set("parquet.task.side.metadata","
> false")did work well in terms of dropping rowgroups/showing small input
> size. What was odd about that is that the overall time wasn't much
> better...but maybe that was overhead from sending the metadata clientside.
>
> Thanks again and looking forward to your fix
>
> On Tue, Jan 20, 2015 at 9:07 PM, Cheng Lian <lian.cs....@gmail.com> wrote:
>
>>  Hey Yana,
>>
>> Sorry for the late reply, missed this important thread somehow. And many
>> thanks for reporting this. It turned out to be a bug — filter pushdown is
>> only enabled when using client side metadata, which is not expected,
>> because task side metadata code path is more performant. And I guess that
>> the reason why setting parquet.task.side.metadata to false didn’t reduce
>> input size for you is because you set the configuration with Spark API, or
>> put it into spark-defaults.conf. This configuration goes to Hadoop
>> Configuration, and Spark only merge properties whose names start with
>> spark.hadoop into Hadoop Configuration instances. You may try to put
>> parquet.task.side.metadata config into Hadoop core-site.xml, and then
>> re-run the query. I can see significant differences by doing so.
>>
>> I’ll open a JIRA and deliver a fix for this ASAP. Thanks again for
>> reporting all the details!
>>
>> Cheng
>>
>> On 1/13/15 12:56 PM, Yana Kadiyska wrote:
>>
>>   Attempting to bump this up in case someone can help out after all. I
>> spent a few good hours stepping through the code today, so I'll summarize
>> my observations both in hope I get some help and to help others that might
>> be looking into this:
>>
>>  1. I am setting *spark.sql.parquet.**filterPushdown=true*
>> 2. I can see by stepping through the driver debugger that
>> PaquetTableOperations.execute sets the filters via
>> ParquetInputFormat.setFilterPredicate (I checked the conf object, things
>> appear OK there)
>> 3. In FilteringParquetRowInputFormat, I get through the codepath for
>> getTaskSideSplits. It seems that the codepath for getClientSideSplits would
>> try to drop rowGroups but I don't see similar in getTaskSideSplit.
>>
>>  Does anyone have pointers on where to look after this? Where is
>> rowgroup filtering happening in the case of getTaskSideSplits? I can attach
>> to the executor but am not quite sure what code related to Parquet gets
>> called executor side...also don't see any messages in the executor logs
>> related to Filtering predicates.
>>
>> For comparison, I went through the getClientSideSplits and can see that
>> predicate pushdown works OK:
>>
>>
>> sc.hadoopConfiguration.set("parquet.task.side.metadata","false")
>>
>> 15/01/13 20:04:49 INFO FilteringParquetRowInputFormat: Using Client Side 
>> Metadata Split Strategy
>> 15/01/13 20:05:13 INFO FilterCompat: Filtering using predicate: eq(epoch, 
>> 1417384800)
>> 15/01/13 20:06:45 INFO FilteringParquetRowInputFormat: Dropping 572 row 
>> groups that do not pass filter predicate (28 %) !
>>
>> ​
>>
>>  Is it possible that this is just a UI bug? I can see Input=4G when
>> using ("parquet.task.side.metadata","false") and Input=140G when using
>> ("parquet.task.side.metadata","true") but the runtimes are very comparable?
>>
>>  [image: Inline image 1]
>>
>>
>>  JobId 4 is the ClientSide split, JobId 5 is the TaskSide split.
>>
>>
>>
>>  On Fri, Jan 9, 2015 at 2:56 PM, Yana Kadiyska <yana.kadiy...@gmail.com>
>> wrote:
>>
>>> I am running the following (connecting to an external Hive Metastore)
>>>
>>>   /a/shark/spark/bin/spark-shell --master spark://ip:7077  --conf
>>> *spark.sql.parquet.filterPushdown=true*
>>>
>>>  val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>
>>>  and then ran two queries:
>>>
>>> sqlContext.sql("select count(*) from table where partition='blah' ")
>>> andsqlContext.sql("select count(*) from table where partition='blah' and 
>>> epoch=1415561604")
>>>
>>> ​
>>>
>>>  According to the Input tab in the UI both scan about 140G of data
>>> which is the size of my whole partition. So I have two questions --
>>>
>>>  1. is there a way to tell from the plan if a predicate pushdown is
>>> supposed to happen?
>>> I see this for the second query
>>>
>>> res0: org.apache.spark.sql.SchemaRDD =
>>> SchemaRDD[0] at RDD at SchemaRDD.scala:108
>>> == Query Plan ==
>>> == Physical Plan ==
>>> Aggregate false, [], [Coalesce(SUM(PartialCount#49L),0) AS _c0#0L]
>>>  Exchange SinglePartition
>>>   Aggregate true, [], [COUNT(1) AS PartialCount#49L]
>>>    OutputFaker []
>>>     Project []
>>>      ParquetTableScan [epoch#139L], (ParquetRelation <list of hdfs files>
>>>
>>> ​
>>>  2. am I doing something obviously wrong that this is not working? (Im
>>> guessing it's not woring because the input size for the second query shows
>>> unchanged and the execution time is almost 2x as long)
>>>
>>>  thanks in advance for any insights
>>>
>>>
>>    ​
>>
>
>

Reply via email to