Hi,

the attached DAG shows that for the same table (self join) SPARK is
unnecessarily getting data from S3 for one side of the join where as its
able to use cache for the other side.


Regards,
Gourav

On Fri, Dec 18, 2015 at 10:29 AM, Gourav Sengupta <gourav.sengu...@gmail.com
> wrote:

> Hi,
>
> I have a table which is directly from S3 location and even a self join on
> that cached table is causing the data to be read from S3 again.
>
> The query plan in mentioned below:
>
> == Parsed Logical Plan ==
> Aggregate [count(1) AS count#1804L]
>  Project [user#0,programme_key#515]
>   Join Inner, Some((programme_key#515 = programme_key#1802))
>    Subquery left_table
>     Subquery omniture
>      Project [scv_id#4 AS user#0,programme_key#515]
>       Filter (((hit_month#2 IN (2015-11) && hit_day#3 IN (cast(20 as 
> string))) && (is_logged_in#8L = cast(1 as bigint))) && (is_4od_video_view#40L 
> = cast(1 as bigint)))
>        MetastoreRelation default, omnitureweb_log, None
>    Subquery right_table
>     Subquery omniture
>      Project [scv_id#1291 AS user#771,programme_key#1802]
>       Filter (((hit_month#1289 IN (2015-11) && hit_day#1290 IN (cast(20 as 
> string))) && (is_logged_in#1295L = cast(1 as bigint))) && 
> (is_4od_video_view#1327L = cast(1 as bigint)))
>        MetastoreRelation default, omnitureweb_log, None
>
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [count(1) AS count#1804L]
>  Project [user#0,programme_key#515]
>   Join Inner, Some((programme_key#515 = programme_key#1802))
>    Subquery left_table
>     Subquery omniture
>      Project [scv_id#4 AS user#0,programme_key#515]
>       Filter (((hit_month#2 IN (2015-11) && hit_day#3 IN (cast(20 as 
> string))) && (is_logged_in#8L = cast(1 as bigint))) && (is_4od_video_view#40L 
> = cast(1 as bigint)))
>        MetastoreRelation default, omnitureweb_log, None
>    Subquery right_table
>     Subquery omniture
>      Project [scv_id#1291 AS user#771,programme_key#1802]
>       Filter (((hit_month#1289 IN (2015-11) && hit_day#1290 IN (cast(20 as 
> string))) && (is_logged_in#1295L = cast(1 as bigint))) && 
> (is_4od_video_view#1327L = cast(1 as bigint)))
>        MetastoreRelation default, omnitureweb_log, None
>
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count#1804L]
>  Project
>   Join Inner, Some((programme_key#515 = programme_key#1802))
>    Project [programme_key#515]
>     InMemoryRelation [user#0,programme_key#515], true, 10000, 
> StorageLevel(true, true, false, true, 1), (Project [scv_id#4 AS 
> user#0,programme_key#515]), None
>    Project [programme_key#1802]
>     Filter (((hit_month#1289 IN (2015-11) && hit_day#1290 IN (20)) && 
> (is_logged_in#1295L = 1)) && (is_4od_video_view#1327L = 1))
>      MetastoreRelation default, omnitureweb_log, None
>
> == Physical Plan ==
> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], 
> output=[count#1804L])
>  TungstenExchange SinglePartition
>   TungstenAggregate(key=[], 
> functions=[(count(1),mode=Partial,isDistinct=false)], 
> output=[currentCount#1817L])
>    TungstenProject
>     SortMergeJoin [programme_key#515], [programme_key#1802]
>      TungstenSort [programme_key#515 ASC], false, 0
>       TungstenExchange hashpartitioning(programme_key#515)
>        ConvertToUnsafe
>         InMemoryColumnarTableScan [programme_key#515], (InMemoryRelation 
> [user#0,programme_key#515], true, 10000, StorageLevel(true, true, false, 
> true, 1), (Project [scv_id#4 AS user#0,programme_key#515]), None)
>      TungstenSort [programme_key#1802 ASC], false, 0
>       TungstenExchange hashpartitioning(programme_key#1802)
>        ConvertToUnsafe
>         Project [programme_key#1802]
>          Filter ((is_logged_in#1295L = 1) && (is_4od_video_view#1327L = 1))
>           HiveTableScan 
> [programme_key#1802,is_logged_in#1295L,is_4od_video_view#1327L], 
> (MetastoreRelation default, omnitureweb_log, None), [hit_month#1289 IN 
> (2015-11),hit_day#1290 IN (20)]
>
> Code Generation: true
>
>
>
> Regards,
> Gourav
>
> On Fri, Dec 18, 2015 at 8:55 AM, Gourav Sengupta <
> gourav.sengu...@gmail.com> wrote:
>
>> hi,
>>
>> I think that people have reported the same issue elsewhere, and this
>> should be registered as a bug in SPARK
>>
>> https://forums.databricks.com/questions/2142/self-join-in-spark-sql.html
>>
>>
>> Regards,
>> Gourav
>>
>> On Thu, Dec 17, 2015 at 10:52 AM, Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> Hi Ted,
>>>
>>> The self join works fine on tbales where the hivecontext tables are
>>> direct hive tables, therefore
>>>
>>> table1 = hiveContext.sql("select columnA, columnB from hivetable1")
>>> table1.registerTempTable("table1")
>>> table1.cache()
>>> table1.count()
>>>
>>> and if I do a self join on table1 things are quite fine
>>>
>>> But in case we have something like this:
>>> table1 = hiveContext.sql("select columnA, columnB from hivetable1")
>>> table1.registerTempTable("table1")
>>> table1.cache()
>>> table1.count()
>>>
>>> table2 = hiveContext.sql("select columnA, columnB from hivetable2")
>>> table2.registerTempTable("table2")
>>> table2.cache()
>>> table2.count()
>>>
>>> table3 = hiveContext.sql("select table1.* from table1 table2 where
>>> table1.columnA = table2.columnA")
>>> table3.registerTempTable("table3")
>>> table3.cache()
>>> table3.count()
>>>
>>>
>>> then the self join on table3 does not take data from table3 cache,
>>> neither from table1 or table2 cache but starts taking data directly from S3
>>> - which as you would understand does not make any sense.
>>>
>>>
>>> Regards,
>>> Gourav
>>>
>>>
>>>
>>>
>>>
>>> On Wed, Dec 16, 2015 at 7:16 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>
>>>> I did the following exercise in spark-shell ("c" is cached table):
>>>>
>>>> scala> sqlContext.sql("select x.b from c x join c y on x.a =
>>>> y.a").explain
>>>> == Physical Plan ==
>>>> Project [b#4]
>>>> +- BroadcastHashJoin [a#3], [a#125], BuildRight
>>>>    :- InMemoryColumnarTableScan [b#4,a#3], InMemoryRelation
>>>> [a#3,b#4,c#5], true, 10000, StorageLevel(true, true, false, true, 1),
>>>> ConvertToUnsafe, Some(c)
>>>>    +- InMemoryColumnarTableScan [a#125], InMemoryRelation
>>>> [a#125,b#126,c#127], true, 10000, StorageLevel(true, true, false, true, 1),
>>>> ConvertToUnsafe, Some(c)
>>>>
>>>> sqlContext.sql("select x.b, y.c from c x join c y on x.a =
>>>> y.a").registerTempTable("d")
>>>> scala> sqlContext.cacheTable("d")
>>>>
>>>> scala> sqlContext.sql("select x.b from d x join d y on x.c =
>>>> y.c").explain
>>>> == Physical Plan ==
>>>> Project [b#4]
>>>> +- SortMergeJoin [c#90], [c#253]
>>>>    :- Sort [c#90 ASC], false, 0
>>>>    :  +- TungstenExchange hashpartitioning(c#90,200), None
>>>>    :     +- InMemoryColumnarTableScan [b#4,c#90], InMemoryRelation
>>>> [b#4,c#90], true, 10000, StorageLevel(true, true, false, true, 1), Project
>>>> [b#4,c#90], Some(d)
>>>>    +- Sort [c#253 ASC], false, 0
>>>>       +- TungstenExchange hashpartitioning(c#253,200), None
>>>>          +- InMemoryColumnarTableScan [c#253], InMemoryRelation
>>>> [b#246,c#253], true, 10000, StorageLevel(true, true, false, true, 1),
>>>> Project [b#4,c#90], Some(d)
>>>>
>>>> Is the above what you observed ?
>>>>
>>>> Cheers
>>>>
>>>> On Wed, Dec 16, 2015 at 9:34 AM, Gourav Sengupta <
>>>> gourav.sengu...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> This is how the data  can be created:
>>>>>
>>>>> 1. TableA : cached()
>>>>> 2. TableB : cached()
>>>>> 3. TableC: TableA inner join TableB cached()
>>>>> 4. TableC join TableC does not take the data from cache but starts
>>>>> reading the data for TableA and TableB from disk.
>>>>>
>>>>> Does this sound like a bug? The self join between TableA and TableB
>>>>> are working fine and taking data from cache.
>>>>>
>>>>>
>>>>> Regards,
>>>>> Gourav
>>>>>
>>>>
>>>>
>>>
>>
>
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to