Re: Partition pruning in spark 1.5.2

2016-04-06 Thread Darshan Singh
It worked fine and I was looking for this only as I do not want cache the
dataframe as the data in some of partitions will change. However, I have
much larger number of partitions(column is not just country but something
where values can be 100's of thousands). Now the metdata is much bigger
than individual partitions. The jobs are taking time in reading the
metadata.

Is there a way to keep just the metadata in cache. I do not want to use
hive.

Will tachyon provide better performance than hdfs in this scenario? I have
to try it yet.

Thanks a lot for all you help.

On Tue, Apr 5, 2016 at 9:41 PM, Darshan Singh 
wrote:

> Thanks a lot. I will try this one  as well.
>
> On Tue, Apr 5, 2016 at 9:28 PM, Michael Armbrust 
> wrote:
>
>> The following should ensure partition pruning happens:
>>
>> df.write.partitionBy("country").save("/path/to/data")
>> sqlContext.read.load("/path/to/data").where("country = 'UK'")
>>
>> On Tue, Apr 5, 2016 at 1:13 PM, Darshan Singh 
>> wrote:
>>
>>> Thanks for the reply.
>>>
>>> Now I saved the part_movies as parquet file.
>>>
>>> Then created new dataframe from the saved parquet file and I did not
>>> persist it. The i ran the same query. It still read all 20 partitions and
>>> this time from hdfs.
>>>
>>> So what will be exact scenario when it will prune partitions. I am bit
>>> confused now. Isnt there a way to see the exact partition pruning?
>>>
>>> Thanks
>>>
>>> On Tue, Apr 5, 2016 at 8:59 PM, Michael Armbrust >> > wrote:
>>>
 For the in-memory cache, we still launch tasks, we just skip blocks
 when possible using statistics about those blocks.

 On Tue, Apr 5, 2016 at 12:14 PM, Darshan Singh 
 wrote:

> Thanks. It is not my exact scenario but I have tried to reproduce it.
> I have used 1.5.2.
>
> I have a part-movies data-frame which has 20 partitions 1 each for a
> movie.
>
> I created following query
>
>
> val part_sql = sqlContext.sql("select * from part_movies where movie =
> 10")
> part_sql.count()
>
> I expect that this should just read from 1 partition i.e. partition
> 10. Other partitions it should max read metadata and not the data.
>
> here is physical plan. I could see the filter. From here i can not say
> whether this filter is causing any partition pruning. If actually pruning
> is happening i would like to see a operator which mentions the same.
>
> == Physical Plan ==
> TungstenAggregate(key=[], 
> functions=[(count(1),mode=Final,isDistinct=false)], output=[count#75L])
>  TungstenExchange SinglePartition
>   TungstenAggregate(key=[], 
> functions=[(count(1),mode=Partial,isDistinct=false)], 
> output=[currentCount#93L])
>Project
> Filter (movie#33 = 10)
>  InMemoryColumnarTableScan [movie#33], [(movie#33 = 10)], 
> (InMemoryRelation [movie#33,title#34,genres#35], true, 1, 
> StorageLevel(true, true, false, true, 1), (Scan 
> PhysicalRDD[movie#33,title#34,genres#35]), None)
>
>
> However, my assumption that partition is not pruned is not based on
> the above plan but when I look at the job and its stages. I could see that
> it has read full data of the dataframe.  I should see around 65KB as that
> is almost average size of each partition.
>
> Aggregated Metrics by Executor
> Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks 
> Input
> Size / Records Shuffle Write Size / Records
> driver localhost:53247 0.4 s 20 0 20 1289.0 KB / 20 840.0 B / 20
>
>
> Task details only first 7. Here I expect that except 1 task(which
> access the partitions data) all others should be either 0 KB or just the
> size of metadata after which it discarded that partition as its data was
> not needed. But i could see that all the partitions are read.
>
> This is small example so it doesnt make diff but for a large dataframe
> reading all the data even that in memory takes time.
>
> Tasks
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 0 27 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 39
> ms 12 ms 9 ms 0 ms 0 ms 0.0 B 66.2 KB (memory) / 1 42.0 B / 1
> 1 28 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 41
> ms 9 ms 7 ms 0 ms 0 ms 0.0 B 63.9 KB (memory) / 1 1 ms 42.0 B / 1
> 2 29 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 40
> ms 7 ms 7 ms 0 ms 0 ms 0.0 B 65.9 KB (memory) / 1 1 ms 42.0 B / 1
> 3 30 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 6
> ms 3 ms 5 ms 0 ms 0 ms 0.0 B 62.0 KB (memory) / 1 42.0 B / 1
> 4 31 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 4
> ms 4 ms 6 ms 1 ms 0 ms 0.0 B 69.2 

RE: Partition pruning in spark 1.5.2

2016-04-05 Thread Yong Zhang
Hi, Michael:
I would like to ask the same question, if the DF hash partitioned, then cache, 
now query/filter by the column which hashed for partition, will Spark be smart 
enough to do the Partition pruning in this case, instead of depending on 
Parquet's partition pruning. I think that is the original question.
Thanks
Yong

From: mich...@databricks.com
Date: Tue, 5 Apr 2016 13:28:46 -0700
Subject: Re: Partition pruning in spark 1.5.2
To: darshan.m...@gmail.com
CC: user@spark.apache.org

The following should ensure partition pruning happens:
df.write.partitionBy("country").save("/path/to/data")sqlContext.read.load("/path/to/data").where("country
 = 'UK'")
On Tue, Apr 5, 2016 at 1:13 PM, Darshan Singh <darshan.m...@gmail.com> wrote:
Thanks for the reply.
Now I saved the part_movies as parquet file.
Then created new dataframe from the saved parquet file and I did not persist 
it. The i ran the same query. It still read all 20 partitions and this time 
from hdfs.
So what will be exact scenario when it will prune partitions. I am bit confused 
now. Isnt there a way to see the exact partition pruning?
Thanks
On Tue, Apr 5, 2016 at 8:59 PM, Michael Armbrust <mich...@databricks.com> wrote:
For the in-memory cache, we still launch tasks, we just skip blocks when 
possible using statistics about those blocks.
On Tue, Apr 5, 2016 at 12:14 PM, Darshan Singh <darshan.m...@gmail.com> wrote:
Thanks. It is not my exact scenario but I have tried to reproduce it. I have 
used 1.5.2.
I have a part-movies data-frame which has 20 partitions 1 each for a movie.
I created following query

val part_sql = sqlContext.sql("select * from part_movies where movie = 
10")part_sql.count()
I expect that this should just read from 1 partition i.e. partition 10. Other 
partitions it should max read metadata and not the data.
here is physical plan. I could see the filter. From here i can not say whether 
this filter is causing any partition pruning. If actually pruning is happening 
i would like to see a operator which mentions the same.== Physical Plan ==
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], 
output=[count#75L])
 TungstenExchange SinglePartition
  TungstenAggregate(key=[], 
functions=[(count(1),mode=Partial,isDistinct=false)], output=[currentCount#93L])
   Project
Filter (movie#33 = 10)
 InMemoryColumnarTableScan [movie#33], [(movie#33 = 10)], (InMemoryRelation 
[movie#33,title#34,genres#35], true, 1, StorageLevel(true, true, false, 
true, 1), (Scan PhysicalRDD[movie#33,title#34,genres#35]), None)
However, my assumption that partition is not pruned is not based on the above 
plan but when I look at the job and its stages. I could see that it has read 
full data of the dataframe.  I should see around 65KB as that is almost average 
size of each partition.
Aggregated Metrics by Executor

 
 
  Executor ID 
  Address 
  Task Time 
  Total Tasks 
  Failed Tasks 
  Succeeded Tasks 
   Input Size / Records  
Shuffle Write Size / Records  
 
 
 
  
  driver 
  localhost:53247 
  0.4 s 
  20 
  0 
  20 
   1289.0 KB / 20  
   840.0 B / 20 


Task details only first 7. Here I expect that except 1 task(which access the 
partitions data) all others should be either 0 KB or just the size of metadata 
after which it discarded that partition as its data was not needed. But i could 
see that all the partitions are read.
This is small example so it doesnt make diff but for a large dataframe reading 
all the data even that in memory takes time.
Tasks


 
 
  
   


















  
  
  
   
   0 
   27 
   0 
   SUCCESS 
   PROCESS_LOCAL 
   driver / localhost 
   2016/04/05 19:01:03 
   39 ms 
12 ms  
9 ms  
 
0 ms  
0 ms  
0.0 B  
   66.2 KB (memory) / 1 
   
   42.0 B / 1 
 
  
   
   1 
   28 
   0 
   SUCCESS 
   PROCESS_LOCAL 
   driver / localhost 
   2016/04/05 19:01:03 
   41 ms 
9 ms  
7 ms  
 
0 ms  
0 ms  
0.0 B  
   63.9 KB (memory) / 1 
   1 ms
   42.0 B / 1 
 
  
   
   2 
   29 
   0 
   SUCCESS 
   PROCESS_LOCAL 
   driver / localhost 
   2016/04/05 19:01:03 
   40 ms 
7 ms  
7 ms  
 
0 ms  
0 ms  
0.0 B  
   65.9 KB (memory) / 1 
   1 ms
   42.0 B / 1 
 
  
   
   3 
   30 
   0 
   SUCCESS 
   PROCESS_LOCAL 
   driver / localhost 
   2016/04/05 19:01:03 
   6 ms 
3 ms  
5 ms  
 
0 ms  
0 ms  
0.0 B  
   62.0 KB (memory) / 1 
   
   42.0 B / 1 
 
  
   

Re: Partition pruning in spark 1.5.2

2016-04-05 Thread Darshan Singh
Thanks a lot. I will try this one  as well.

On Tue, Apr 5, 2016 at 9:28 PM, Michael Armbrust 
wrote:

> The following should ensure partition pruning happens:
>
> df.write.partitionBy("country").save("/path/to/data")
> sqlContext.read.load("/path/to/data").where("country = 'UK'")
>
> On Tue, Apr 5, 2016 at 1:13 PM, Darshan Singh 
> wrote:
>
>> Thanks for the reply.
>>
>> Now I saved the part_movies as parquet file.
>>
>> Then created new dataframe from the saved parquet file and I did not
>> persist it. The i ran the same query. It still read all 20 partitions and
>> this time from hdfs.
>>
>> So what will be exact scenario when it will prune partitions. I am bit
>> confused now. Isnt there a way to see the exact partition pruning?
>>
>> Thanks
>>
>> On Tue, Apr 5, 2016 at 8:59 PM, Michael Armbrust 
>> wrote:
>>
>>> For the in-memory cache, we still launch tasks, we just skip blocks when
>>> possible using statistics about those blocks.
>>>
>>> On Tue, Apr 5, 2016 at 12:14 PM, Darshan Singh 
>>> wrote:
>>>
 Thanks. It is not my exact scenario but I have tried to reproduce it. I
 have used 1.5.2.

 I have a part-movies data-frame which has 20 partitions 1 each for a
 movie.

 I created following query


 val part_sql = sqlContext.sql("select * from part_movies where movie =
 10")
 part_sql.count()

 I expect that this should just read from 1 partition i.e. partition 10.
 Other partitions it should max read metadata and not the data.

 here is physical plan. I could see the filter. From here i can not say
 whether this filter is causing any partition pruning. If actually pruning
 is happening i would like to see a operator which mentions the same.

 == Physical Plan ==
 TungstenAggregate(key=[], 
 functions=[(count(1),mode=Final,isDistinct=false)], output=[count#75L])
  TungstenExchange SinglePartition
   TungstenAggregate(key=[], 
 functions=[(count(1),mode=Partial,isDistinct=false)], 
 output=[currentCount#93L])
Project
 Filter (movie#33 = 10)
  InMemoryColumnarTableScan [movie#33], [(movie#33 = 10)], 
 (InMemoryRelation [movie#33,title#34,genres#35], true, 1, 
 StorageLevel(true, true, false, true, 1), (Scan 
 PhysicalRDD[movie#33,title#34,genres#35]), None)


 However, my assumption that partition is not pruned is not based on the
 above plan but when I look at the job and its stages. I could see that it
 has read full data of the dataframe.  I should see around 65KB as that is
 almost average size of each partition.

 Aggregated Metrics by Executor
 Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks 
 Input
 Size / Records Shuffle Write Size / Records
 driver localhost:53247 0.4 s 20 0 20 1289.0 KB / 20 840.0 B / 20


 Task details only first 7. Here I expect that except 1 task(which
 access the partitions data) all others should be either 0 KB or just the
 size of metadata after which it discarded that partition as its data was
 not needed. But i could see that all the partitions are read.

 This is small example so it doesnt make diff but for a large dataframe
 reading all the data even that in memory takes time.

 Tasks


















 0 27 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 39
 ms 12 ms 9 ms 0 ms 0 ms 0.0 B 66.2 KB (memory) / 1 42.0 B / 1
 1 28 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 41
 ms 9 ms 7 ms 0 ms 0 ms 0.0 B 63.9 KB (memory) / 1 1 ms 42.0 B / 1
 2 29 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 40
 ms 7 ms 7 ms 0 ms 0 ms 0.0 B 65.9 KB (memory) / 1 1 ms 42.0 B / 1
 3 30 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 6
 ms 3 ms 5 ms 0 ms 0 ms 0.0 B 62.0 KB (memory) / 1 42.0 B / 1
 4 31 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 4
 ms 4 ms 6 ms 1 ms 0 ms 0.0 B 69.2 KB (memory) / 1 42.0 B / 1
 5 32 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 5
 ms 2 ms 5 ms 0 ms 0 ms 0.0 B 60.3 KB (memory) / 1 42.0 B / 1
 6 33 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 5
 ms 3 ms 4 ms 0 ms 0 ms 0.0 B 70.3 KB (memory) / 1 42.0 B / 1
 7 34 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 4
 ms 5 ms 4 ms 0 ms 0 ms 0.0 B 59.7 KB (memory) / 1 42.0 B / 1

 Let me know if you need anything else.

 Thanks




 On Tue, Apr 5, 2016 at 7:29 PM, Michael Armbrust <
 mich...@databricks.com> wrote:

> Can you show your full code.  How are you partitioning the data? How
> are you reading it?  What is the resulting query plan (run 

Re: Partition pruning in spark 1.5.2

2016-04-05 Thread Michael Armbrust
The following should ensure partition pruning happens:

df.write.partitionBy("country").save("/path/to/data")
sqlContext.read.load("/path/to/data").where("country = 'UK'")

On Tue, Apr 5, 2016 at 1:13 PM, Darshan Singh 
wrote:

> Thanks for the reply.
>
> Now I saved the part_movies as parquet file.
>
> Then created new dataframe from the saved parquet file and I did not
> persist it. The i ran the same query. It still read all 20 partitions and
> this time from hdfs.
>
> So what will be exact scenario when it will prune partitions. I am bit
> confused now. Isnt there a way to see the exact partition pruning?
>
> Thanks
>
> On Tue, Apr 5, 2016 at 8:59 PM, Michael Armbrust 
> wrote:
>
>> For the in-memory cache, we still launch tasks, we just skip blocks when
>> possible using statistics about those blocks.
>>
>> On Tue, Apr 5, 2016 at 12:14 PM, Darshan Singh 
>> wrote:
>>
>>> Thanks. It is not my exact scenario but I have tried to reproduce it. I
>>> have used 1.5.2.
>>>
>>> I have a part-movies data-frame which has 20 partitions 1 each for a
>>> movie.
>>>
>>> I created following query
>>>
>>>
>>> val part_sql = sqlContext.sql("select * from part_movies where movie =
>>> 10")
>>> part_sql.count()
>>>
>>> I expect that this should just read from 1 partition i.e. partition 10.
>>> Other partitions it should max read metadata and not the data.
>>>
>>> here is physical plan. I could see the filter. From here i can not say
>>> whether this filter is causing any partition pruning. If actually pruning
>>> is happening i would like to see a operator which mentions the same.
>>>
>>> == Physical Plan ==
>>> TungstenAggregate(key=[], 
>>> functions=[(count(1),mode=Final,isDistinct=false)], output=[count#75L])
>>>  TungstenExchange SinglePartition
>>>   TungstenAggregate(key=[], 
>>> functions=[(count(1),mode=Partial,isDistinct=false)], 
>>> output=[currentCount#93L])
>>>Project
>>> Filter (movie#33 = 10)
>>>  InMemoryColumnarTableScan [movie#33], [(movie#33 = 10)], 
>>> (InMemoryRelation [movie#33,title#34,genres#35], true, 1, 
>>> StorageLevel(true, true, false, true, 1), (Scan 
>>> PhysicalRDD[movie#33,title#34,genres#35]), None)
>>>
>>>
>>> However, my assumption that partition is not pruned is not based on the
>>> above plan but when I look at the job and its stages. I could see that it
>>> has read full data of the dataframe.  I should see around 65KB as that is
>>> almost average size of each partition.
>>>
>>> Aggregated Metrics by Executor
>>> Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks Input
>>> Size / Records Shuffle Write Size / Records
>>> driver localhost:53247 0.4 s 20 0 20 1289.0 KB / 20 840.0 B / 20
>>>
>>>
>>> Task details only first 7. Here I expect that except 1 task(which access
>>> the partitions data) all others should be either 0 KB or just the size of
>>> metadata after which it discarded that partition as its data was not
>>> needed. But i could see that all the partitions are read.
>>>
>>> This is small example so it doesnt make diff but for a large dataframe
>>> reading all the data even that in memory takes time.
>>>
>>> Tasks
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> 0 27 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 39
>>> ms 12 ms 9 ms 0 ms 0 ms 0.0 B 66.2 KB (memory) / 1 42.0 B / 1
>>> 1 28 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 41
>>> ms 9 ms 7 ms 0 ms 0 ms 0.0 B 63.9 KB (memory) / 1 1 ms 42.0 B / 1
>>> 2 29 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 40
>>> ms 7 ms 7 ms 0 ms 0 ms 0.0 B 65.9 KB (memory) / 1 1 ms 42.0 B / 1
>>> 3 30 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 6 ms 3
>>> ms 5 ms 0 ms 0 ms 0.0 B 62.0 KB (memory) / 1 42.0 B / 1
>>> 4 31 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 4 ms 4
>>> ms 6 ms 1 ms 0 ms 0.0 B 69.2 KB (memory) / 1 42.0 B / 1
>>> 5 32 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 5 ms 2
>>> ms 5 ms 0 ms 0 ms 0.0 B 60.3 KB (memory) / 1 42.0 B / 1
>>> 6 33 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 5 ms 3
>>> ms 4 ms 0 ms 0 ms 0.0 B 70.3 KB (memory) / 1 42.0 B / 1
>>> 7 34 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 4 ms 5
>>> ms 4 ms 0 ms 0 ms 0.0 B 59.7 KB (memory) / 1 42.0 B / 1
>>>
>>> Let me know if you need anything else.
>>>
>>> Thanks
>>>
>>>
>>>
>>>
>>> On Tue, Apr 5, 2016 at 7:29 PM, Michael Armbrust >> > wrote:
>>>
 Can you show your full code.  How are you partitioning the data? How
 are you reading it?  What is the resulting query plan (run explain() or
 EXPLAIN).

 On Tue, Apr 5, 2016 at 10:02 AM, dsing001 
 wrote:

> HI,
>
> I am using 1.5.2. I have a dataframe which is partitioned based on the
> country. So I have around 150 partition in the 

Re: Partition pruning in spark 1.5.2

2016-04-05 Thread Darshan Singh
Thanks for the reply.

Now I saved the part_movies as parquet file.

Then created new dataframe from the saved parquet file and I did not
persist it. The i ran the same query. It still read all 20 partitions and
this time from hdfs.

So what will be exact scenario when it will prune partitions. I am bit
confused now. Isnt there a way to see the exact partition pruning?

Thanks

On Tue, Apr 5, 2016 at 8:59 PM, Michael Armbrust 
wrote:

> For the in-memory cache, we still launch tasks, we just skip blocks when
> possible using statistics about those blocks.
>
> On Tue, Apr 5, 2016 at 12:14 PM, Darshan Singh 
> wrote:
>
>> Thanks. It is not my exact scenario but I have tried to reproduce it. I
>> have used 1.5.2.
>>
>> I have a part-movies data-frame which has 20 partitions 1 each for a
>> movie.
>>
>> I created following query
>>
>>
>> val part_sql = sqlContext.sql("select * from part_movies where movie =
>> 10")
>> part_sql.count()
>>
>> I expect that this should just read from 1 partition i.e. partition 10.
>> Other partitions it should max read metadata and not the data.
>>
>> here is physical plan. I could see the filter. From here i can not say
>> whether this filter is causing any partition pruning. If actually pruning
>> is happening i would like to see a operator which mentions the same.
>>
>> == Physical Plan ==
>> TungstenAggregate(key=[], 
>> functions=[(count(1),mode=Final,isDistinct=false)], output=[count#75L])
>>  TungstenExchange SinglePartition
>>   TungstenAggregate(key=[], 
>> functions=[(count(1),mode=Partial,isDistinct=false)], 
>> output=[currentCount#93L])
>>Project
>> Filter (movie#33 = 10)
>>  InMemoryColumnarTableScan [movie#33], [(movie#33 = 10)], 
>> (InMemoryRelation [movie#33,title#34,genres#35], true, 1, 
>> StorageLevel(true, true, false, true, 1), (Scan 
>> PhysicalRDD[movie#33,title#34,genres#35]), None)
>>
>>
>> However, my assumption that partition is not pruned is not based on the
>> above plan but when I look at the job and its stages. I could see that it
>> has read full data of the dataframe.  I should see around 65KB as that is
>> almost average size of each partition.
>>
>> Aggregated Metrics by Executor
>> Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks Input
>> Size / Records Shuffle Write Size / Records
>> driver localhost:53247 0.4 s 20 0 20 1289.0 KB / 20 840.0 B / 20
>>
>>
>> Task details only first 7. Here I expect that except 1 task(which access
>> the partitions data) all others should be either 0 KB or just the size of
>> metadata after which it discarded that partition as its data was not
>> needed. But i could see that all the partitions are read.
>>
>> This is small example so it doesnt make diff but for a large dataframe
>> reading all the data even that in memory takes time.
>>
>> Tasks
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 0 27 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 39 ms 12
>> ms 9 ms 0 ms 0 ms 0.0 B 66.2 KB (memory) / 1 42.0 B / 1
>> 1 28 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 41 ms 9
>> ms 7 ms 0 ms 0 ms 0.0 B 63.9 KB (memory) / 1 1 ms 42.0 B / 1
>> 2 29 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 40 ms 7
>> ms 7 ms 0 ms 0 ms 0.0 B 65.9 KB (memory) / 1 1 ms 42.0 B / 1
>> 3 30 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 6 ms 3
>> ms 5 ms 0 ms 0 ms 0.0 B 62.0 KB (memory) / 1 42.0 B / 1
>> 4 31 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 4 ms 4
>> ms 6 ms 1 ms 0 ms 0.0 B 69.2 KB (memory) / 1 42.0 B / 1
>> 5 32 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 5 ms 2
>> ms 5 ms 0 ms 0 ms 0.0 B 60.3 KB (memory) / 1 42.0 B / 1
>> 6 33 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 5 ms 3
>> ms 4 ms 0 ms 0 ms 0.0 B 70.3 KB (memory) / 1 42.0 B / 1
>> 7 34 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 4 ms 5
>> ms 4 ms 0 ms 0 ms 0.0 B 59.7 KB (memory) / 1 42.0 B / 1
>>
>> Let me know if you need anything else.
>>
>> Thanks
>>
>>
>>
>>
>> On Tue, Apr 5, 2016 at 7:29 PM, Michael Armbrust 
>> wrote:
>>
>>> Can you show your full code.  How are you partitioning the data? How are
>>> you reading it?  What is the resulting query plan (run explain() or
>>> EXPLAIN).
>>>
>>> On Tue, Apr 5, 2016 at 10:02 AM, dsing001 
>>> wrote:
>>>
 HI,

 I am using 1.5.2. I have a dataframe which is partitioned based on the
 country. So I have around 150 partition in the dataframe. When I run
 sparksql and use country = 'UK' it still reads all partitions and not
 able
 to prune other partitions. Thus all the queries run for similar times
 independent of what country I pass. Is it desired?

 Is there a way to fix this in 1.5.2 by using some parameter or is it
 fixed
 in latest versions?

 Thanks



 --
 View 

Re: Partition pruning in spark 1.5.2

2016-04-05 Thread Michael Armbrust
For the in-memory cache, we still launch tasks, we just skip blocks when
possible using statistics about those blocks.

On Tue, Apr 5, 2016 at 12:14 PM, Darshan Singh 
wrote:

> Thanks. It is not my exact scenario but I have tried to reproduce it. I
> have used 1.5.2.
>
> I have a part-movies data-frame which has 20 partitions 1 each for a movie.
>
> I created following query
>
>
> val part_sql = sqlContext.sql("select * from part_movies where movie = 10")
> part_sql.count()
>
> I expect that this should just read from 1 partition i.e. partition 10.
> Other partitions it should max read metadata and not the data.
>
> here is physical plan. I could see the filter. From here i can not say
> whether this filter is causing any partition pruning. If actually pruning
> is happening i would like to see a operator which mentions the same.
>
> == Physical Plan ==
> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], 
> output=[count#75L])
>  TungstenExchange SinglePartition
>   TungstenAggregate(key=[], 
> functions=[(count(1),mode=Partial,isDistinct=false)], 
> output=[currentCount#93L])
>Project
> Filter (movie#33 = 10)
>  InMemoryColumnarTableScan [movie#33], [(movie#33 = 10)], 
> (InMemoryRelation [movie#33,title#34,genres#35], true, 1, 
> StorageLevel(true, true, false, true, 1), (Scan 
> PhysicalRDD[movie#33,title#34,genres#35]), None)
>
>
> However, my assumption that partition is not pruned is not based on the
> above plan but when I look at the job and its stages. I could see that it
> has read full data of the dataframe.  I should see around 65KB as that is
> almost average size of each partition.
>
> Aggregated Metrics by Executor
> Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks Input
> Size / Records Shuffle Write Size / Records
> driver localhost:53247 0.4 s 20 0 20 1289.0 KB / 20 840.0 B / 20
>
>
> Task details only first 7. Here I expect that except 1 task(which access
> the partitions data) all others should be either 0 KB or just the size of
> metadata after which it discarded that partition as its data was not
> needed. But i could see that all the partitions are read.
>
> This is small example so it doesnt make diff but for a large dataframe
> reading all the data even that in memory takes time.
>
> Tasks
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 0 27 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 39 ms 12
> ms 9 ms 0 ms 0 ms 0.0 B 66.2 KB (memory) / 1 42.0 B / 1
> 1 28 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 41 ms 9
> ms 7 ms 0 ms 0 ms 0.0 B 63.9 KB (memory) / 1 1 ms 42.0 B / 1
> 2 29 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 40 ms 7
> ms 7 ms 0 ms 0 ms 0.0 B 65.9 KB (memory) / 1 1 ms 42.0 B / 1
> 3 30 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 6 ms 3
> ms 5 ms 0 ms 0 ms 0.0 B 62.0 KB (memory) / 1 42.0 B / 1
> 4 31 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 4 ms 4
> ms 6 ms 1 ms 0 ms 0.0 B 69.2 KB (memory) / 1 42.0 B / 1
> 5 32 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 5 ms 2
> ms 5 ms 0 ms 0 ms 0.0 B 60.3 KB (memory) / 1 42.0 B / 1
> 6 33 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 5 ms 3
> ms 4 ms 0 ms 0 ms 0.0 B 70.3 KB (memory) / 1 42.0 B / 1
> 7 34 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 4 ms 5
> ms 4 ms 0 ms 0 ms 0.0 B 59.7 KB (memory) / 1 42.0 B / 1
>
> Let me know if you need anything else.
>
> Thanks
>
>
>
>
> On Tue, Apr 5, 2016 at 7:29 PM, Michael Armbrust 
> wrote:
>
>> Can you show your full code.  How are you partitioning the data? How are
>> you reading it?  What is the resulting query plan (run explain() or
>> EXPLAIN).
>>
>> On Tue, Apr 5, 2016 at 10:02 AM, dsing001  wrote:
>>
>>> HI,
>>>
>>> I am using 1.5.2. I have a dataframe which is partitioned based on the
>>> country. So I have around 150 partition in the dataframe. When I run
>>> sparksql and use country = 'UK' it still reads all partitions and not
>>> able
>>> to prune other partitions. Thus all the queries run for similar times
>>> independent of what country I pass. Is it desired?
>>>
>>> Is there a way to fix this in 1.5.2 by using some parameter or is it
>>> fixed
>>> in latest versions?
>>>
>>> Thanks
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Partition-pruning-in-spark-1-5-2-tp26682.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Partition pruning in spark 1.5.2

2016-04-05 Thread Darshan Singh
Thanks. It is not my exact scenario but I have tried to reproduce it. I
have used 1.5.2.

I have a part-movies data-frame which has 20 partitions 1 each for a movie.

I created following query


val part_sql = sqlContext.sql("select * from part_movies where movie = 10")
part_sql.count()

I expect that this should just read from 1 partition i.e. partition 10.
Other partitions it should max read metadata and not the data.

here is physical plan. I could see the filter. From here i can not say
whether this filter is causing any partition pruning. If actually pruning
is happening i would like to see a operator which mentions the same.

== Physical Plan ==
TungstenAggregate(key=[],
functions=[(count(1),mode=Final,isDistinct=false)],
output=[count#75L])
 TungstenExchange SinglePartition
  TungstenAggregate(key=[],
functions=[(count(1),mode=Partial,isDistinct=false)],
output=[currentCount#93L])
   Project
Filter (movie#33 = 10)
 InMemoryColumnarTableScan [movie#33], [(movie#33 = 10)],
(InMemoryRelation [movie#33,title#34,genres#35], true, 1,
StorageLevel(true, true, false, true, 1), (Scan
PhysicalRDD[movie#33,title#34,genres#35]), None)


However, my assumption that partition is not pruned is not based on the
above plan but when I look at the job and its stages. I could see that it
has read full data of the dataframe.  I should see around 65KB as that is
almost average size of each partition.

Aggregated Metrics by Executor
Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks Input
Size / Records Shuffle Write Size / Records
driver localhost:53247 0.4 s 20 0 20 1289.0 KB / 20 840.0 B / 20


Task details only first 7. Here I expect that except 1 task(which access
the partitions data) all others should be either 0 KB or just the size of
metadata after which it discarded that partition as its data was not
needed. But i could see that all the partitions are read.

This is small example so it doesnt make diff but for a large dataframe
reading all the data even that in memory takes time.

Tasks


















0 27 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 39 ms 12
ms 9 ms 0 ms 0 ms 0.0 B 66.2 KB (memory) / 1 42.0 B / 1
1 28 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 41 ms 9
ms 7 ms 0 ms 0 ms 0.0 B 63.9 KB (memory) / 1 1 ms 42.0 B / 1
2 29 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 40 ms 7
ms 7 ms 0 ms 0 ms 0.0 B 65.9 KB (memory) / 1 1 ms 42.0 B / 1
3 30 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 6 ms 3
ms 5 ms 0 ms 0 ms 0.0 B 62.0 KB (memory) / 1 42.0 B / 1
4 31 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 4 ms 4
ms 6 ms 1 ms 0 ms 0.0 B 69.2 KB (memory) / 1 42.0 B / 1
5 32 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 5 ms 2
ms 5 ms 0 ms 0 ms 0.0 B 60.3 KB (memory) / 1 42.0 B / 1
6 33 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 5 ms 3
ms 4 ms 0 ms 0 ms 0.0 B 70.3 KB (memory) / 1 42.0 B / 1
7 34 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 4 ms 5
ms 4 ms 0 ms 0 ms 0.0 B 59.7 KB (memory) / 1 42.0 B / 1

Let me know if you need anything else.

Thanks




On Tue, Apr 5, 2016 at 7:29 PM, Michael Armbrust 
wrote:

> Can you show your full code.  How are you partitioning the data? How are
> you reading it?  What is the resulting query plan (run explain() or
> EXPLAIN).
>
> On Tue, Apr 5, 2016 at 10:02 AM, dsing001  wrote:
>
>> HI,
>>
>> I am using 1.5.2. I have a dataframe which is partitioned based on the
>> country. So I have around 150 partition in the dataframe. When I run
>> sparksql and use country = 'UK' it still reads all partitions and not able
>> to prune other partitions. Thus all the queries run for similar times
>> independent of what country I pass. Is it desired?
>>
>> Is there a way to fix this in 1.5.2 by using some parameter or is it fixed
>> in latest versions?
>>
>> Thanks
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Partition-pruning-in-spark-1-5-2-tp26682.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Partition pruning in spark 1.5.2

2016-04-05 Thread Michael Armbrust
Can you show your full code.  How are you partitioning the data? How are
you reading it?  What is the resulting query plan (run explain() or
EXPLAIN).

On Tue, Apr 5, 2016 at 10:02 AM, dsing001  wrote:

> HI,
>
> I am using 1.5.2. I have a dataframe which is partitioned based on the
> country. So I have around 150 partition in the dataframe. When I run
> sparksql and use country = 'UK' it still reads all partitions and not able
> to prune other partitions. Thus all the queries run for similar times
> independent of what country I pass. Is it desired?
>
> Is there a way to fix this in 1.5.2 by using some parameter or is it fixed
> in latest versions?
>
> Thanks
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Partition-pruning-in-spark-1-5-2-tp26682.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Partition pruning in spark 1.5.2

2016-04-05 Thread dsing001
HI,

I am using 1.5.2. I have a dataframe which is partitioned based on the
country. So I have around 150 partition in the dataframe. When I run
sparksql and use country = 'UK' it still reads all partitions and not able
to prune other partitions. Thus all the queries run for similar times
independent of what country I pass. Is it desired?

Is there a way to fix this in 1.5.2 by using some parameter or is it fixed
in latest versions?

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Partition-pruning-in-spark-1-5-2-tp26682.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org