The picture is a bit hard to read.

I did a brief search but haven't found JIRA for this issue.

Consider logging a SPARK JIRA.

Cheers

On Fri, Dec 18, 2015 at 4:37 AM, Gourav Sengupta <gourav.sengu...@gmail.com>
wrote:

> Hi,
>
> the attached DAG shows that for the same table (self join) SPARK is
> unnecessarily getting data from S3 for one side of the join where as its
> able to use cache for the other side.
>
>
> Regards,
> Gourav
>
> On Fri, Dec 18, 2015 at 10:29 AM, Gourav Sengupta <
> gourav.sengu...@gmail.com> wrote:
>
>> Hi,
>>
>> I have a table which is directly from S3 location and even a self join on
>> that cached table is causing the data to be read from S3 again.
>>
>> The query plan in mentioned below:
>>
>> == Parsed Logical Plan ==
>> Aggregate [count(1) AS count#1804L]
>>  Project [user#0,programme_key#515]
>>   Join Inner, Some((programme_key#515 = programme_key#1802))
>>    Subquery left_table
>>     Subquery omniture
>>      Project [scv_id#4 AS user#0,programme_key#515]
>>       Filter (((hit_month#2 IN (2015-11) && hit_day#3 IN (cast(20 as 
>> string))) && (is_logged_in#8L = cast(1 as bigint))) && 
>> (is_4od_video_view#40L = cast(1 as bigint)))
>>        MetastoreRelation default, omnitureweb_log, None
>>    Subquery right_table
>>     Subquery omniture
>>      Project [scv_id#1291 AS user#771,programme_key#1802]
>>       Filter (((hit_month#1289 IN (2015-11) && hit_day#1290 IN (cast(20 as 
>> string))) && (is_logged_in#1295L = cast(1 as bigint))) && 
>> (is_4od_video_view#1327L = cast(1 as bigint)))
>>        MetastoreRelation default, omnitureweb_log, None
>>
>> == Analyzed Logical Plan ==
>> count: bigint
>> Aggregate [count(1) AS count#1804L]
>>  Project [user#0,programme_key#515]
>>   Join Inner, Some((programme_key#515 = programme_key#1802))
>>    Subquery left_table
>>     Subquery omniture
>>      Project [scv_id#4 AS user#0,programme_key#515]
>>       Filter (((hit_month#2 IN (2015-11) && hit_day#3 IN (cast(20 as 
>> string))) && (is_logged_in#8L = cast(1 as bigint))) && 
>> (is_4od_video_view#40L = cast(1 as bigint)))
>>        MetastoreRelation default, omnitureweb_log, None
>>    Subquery right_table
>>     Subquery omniture
>>      Project [scv_id#1291 AS user#771,programme_key#1802]
>>       Filter (((hit_month#1289 IN (2015-11) && hit_day#1290 IN (cast(20 as 
>> string))) && (is_logged_in#1295L = cast(1 as bigint))) && 
>> (is_4od_video_view#1327L = cast(1 as bigint)))
>>        MetastoreRelation default, omnitureweb_log, None
>>
>> == Optimized Logical Plan ==
>> Aggregate [count(1) AS count#1804L]
>>  Project
>>   Join Inner, Some((programme_key#515 = programme_key#1802))
>>    Project [programme_key#515]
>>     InMemoryRelation [user#0,programme_key#515], true, 10000, 
>> StorageLevel(true, true, false, true, 1), (Project [scv_id#4 AS 
>> user#0,programme_key#515]), None
>>    Project [programme_key#1802]
>>     Filter (((hit_month#1289 IN (2015-11) && hit_day#1290 IN (20)) && 
>> (is_logged_in#1295L = 1)) && (is_4od_video_view#1327L = 1))
>>      MetastoreRelation default, omnitureweb_log, None
>>
>> == Physical Plan ==
>> TungstenAggregate(key=[], 
>> functions=[(count(1),mode=Final,isDistinct=false)], output=[count#1804L])
>>  TungstenExchange SinglePartition
>>   TungstenAggregate(key=[], 
>> functions=[(count(1),mode=Partial,isDistinct=false)], 
>> output=[currentCount#1817L])
>>    TungstenProject
>>     SortMergeJoin [programme_key#515], [programme_key#1802]
>>      TungstenSort [programme_key#515 ASC], false, 0
>>       TungstenExchange hashpartitioning(programme_key#515)
>>        ConvertToUnsafe
>>         InMemoryColumnarTableScan [programme_key#515], (InMemoryRelation 
>> [user#0,programme_key#515], true, 10000, StorageLevel(true, true, false, 
>> true, 1), (Project [scv_id#4 AS user#0,programme_key#515]), None)
>>      TungstenSort [programme_key#1802 ASC], false, 0
>>       TungstenExchange hashpartitioning(programme_key#1802)
>>        ConvertToUnsafe
>>         Project [programme_key#1802]
>>          Filter ((is_logged_in#1295L = 1) && (is_4od_video_view#1327L = 1))
>>           HiveTableScan 
>> [programme_key#1802,is_logged_in#1295L,is_4od_video_view#1327L], 
>> (MetastoreRelation default, omnitureweb_log, None), [hit_month#1289 IN 
>> (2015-11),hit_day#1290 IN (20)]
>>
>> Code Generation: true
>>
>>
>>
>> Regards,
>> Gourav
>>
>> On Fri, Dec 18, 2015 at 8:55 AM, Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> hi,
>>>
>>> I think that people have reported the same issue elsewhere, and this
>>> should be registered as a bug in SPARK
>>>
>>> https://forums.databricks.com/questions/2142/self-join-in-spark-sql.html
>>>
>>>
>>> Regards,
>>> Gourav
>>>
>>> On Thu, Dec 17, 2015 at 10:52 AM, Gourav Sengupta <
>>> gourav.sengu...@gmail.com> wrote:
>>>
>>>> Hi Ted,
>>>>
>>>> The self join works fine on tbales where the hivecontext tables are
>>>> direct hive tables, therefore
>>>>
>>>> table1 = hiveContext.sql("select columnA, columnB from hivetable1")
>>>> table1.registerTempTable("table1")
>>>> table1.cache()
>>>> table1.count()
>>>>
>>>> and if I do a self join on table1 things are quite fine
>>>>
>>>> But in case we have something like this:
>>>> table1 = hiveContext.sql("select columnA, columnB from hivetable1")
>>>> table1.registerTempTable("table1")
>>>> table1.cache()
>>>> table1.count()
>>>>
>>>> table2 = hiveContext.sql("select columnA, columnB from hivetable2")
>>>> table2.registerTempTable("table2")
>>>> table2.cache()
>>>> table2.count()
>>>>
>>>> table3 = hiveContext.sql("select table1.* from table1 table2 where
>>>> table1.columnA = table2.columnA")
>>>> table3.registerTempTable("table3")
>>>> table3.cache()
>>>> table3.count()
>>>>
>>>>
>>>> then the self join on table3 does not take data from table3 cache,
>>>> neither from table1 or table2 cache but starts taking data directly from S3
>>>> - which as you would understand does not make any sense.
>>>>
>>>>
>>>> Regards,
>>>> Gourav
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, Dec 16, 2015 at 7:16 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>
>>>>> I did the following exercise in spark-shell ("c" is cached table):
>>>>>
>>>>> scala> sqlContext.sql("select x.b from c x join c y on x.a =
>>>>> y.a").explain
>>>>> == Physical Plan ==
>>>>> Project [b#4]
>>>>> +- BroadcastHashJoin [a#3], [a#125], BuildRight
>>>>>    :- InMemoryColumnarTableScan [b#4,a#3], InMemoryRelation
>>>>> [a#3,b#4,c#5], true, 10000, StorageLevel(true, true, false, true, 1),
>>>>> ConvertToUnsafe, Some(c)
>>>>>    +- InMemoryColumnarTableScan [a#125], InMemoryRelation
>>>>> [a#125,b#126,c#127], true, 10000, StorageLevel(true, true, false, true, 
>>>>> 1),
>>>>> ConvertToUnsafe, Some(c)
>>>>>
>>>>> sqlContext.sql("select x.b, y.c from c x join c y on x.a =
>>>>> y.a").registerTempTable("d")
>>>>> scala> sqlContext.cacheTable("d")
>>>>>
>>>>> scala> sqlContext.sql("select x.b from d x join d y on x.c =
>>>>> y.c").explain
>>>>> == Physical Plan ==
>>>>> Project [b#4]
>>>>> +- SortMergeJoin [c#90], [c#253]
>>>>>    :- Sort [c#90 ASC], false, 0
>>>>>    :  +- TungstenExchange hashpartitioning(c#90,200), None
>>>>>    :     +- InMemoryColumnarTableScan [b#4,c#90], InMemoryRelation
>>>>> [b#4,c#90], true, 10000, StorageLevel(true, true, false, true, 1), Project
>>>>> [b#4,c#90], Some(d)
>>>>>    +- Sort [c#253 ASC], false, 0
>>>>>       +- TungstenExchange hashpartitioning(c#253,200), None
>>>>>          +- InMemoryColumnarTableScan [c#253], InMemoryRelation
>>>>> [b#246,c#253], true, 10000, StorageLevel(true, true, false, true, 1),
>>>>> Project [b#4,c#90], Some(d)
>>>>>
>>>>> Is the above what you observed ?
>>>>>
>>>>> Cheers
>>>>>
>>>>> On Wed, Dec 16, 2015 at 9:34 AM, Gourav Sengupta <
>>>>> gourav.sengu...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> This is how the data  can be created:
>>>>>>
>>>>>> 1. TableA : cached()
>>>>>> 2. TableB : cached()
>>>>>> 3. TableC: TableA inner join TableB cached()
>>>>>> 4. TableC join TableC does not take the data from cache but starts
>>>>>> reading the data for TableA and TableB from disk.
>>>>>>
>>>>>> Does this sound like a bug? The self join between TableA and TableB
>>>>>> are working fine and taking data from cache.
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>> Gourav
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to