The picture is a bit hard to read. I did a brief search but haven't found JIRA for this issue.
Consider logging a SPARK JIRA. Cheers On Fri, Dec 18, 2015 at 4:37 AM, Gourav Sengupta <gourav.sengu...@gmail.com> wrote: > Hi, > > the attached DAG shows that for the same table (self join) SPARK is > unnecessarily getting data from S3 for one side of the join where as its > able to use cache for the other side. > > > Regards, > Gourav > > On Fri, Dec 18, 2015 at 10:29 AM, Gourav Sengupta < > gourav.sengu...@gmail.com> wrote: > >> Hi, >> >> I have a table which is directly from S3 location and even a self join on >> that cached table is causing the data to be read from S3 again. >> >> The query plan in mentioned below: >> >> == Parsed Logical Plan == >> Aggregate [count(1) AS count#1804L] >> Project [user#0,programme_key#515] >> Join Inner, Some((programme_key#515 = programme_key#1802)) >> Subquery left_table >> Subquery omniture >> Project [scv_id#4 AS user#0,programme_key#515] >> Filter (((hit_month#2 IN (2015-11) && hit_day#3 IN (cast(20 as >> string))) && (is_logged_in#8L = cast(1 as bigint))) && >> (is_4od_video_view#40L = cast(1 as bigint))) >> MetastoreRelation default, omnitureweb_log, None >> Subquery right_table >> Subquery omniture >> Project [scv_id#1291 AS user#771,programme_key#1802] >> Filter (((hit_month#1289 IN (2015-11) && hit_day#1290 IN (cast(20 as >> string))) && (is_logged_in#1295L = cast(1 as bigint))) && >> (is_4od_video_view#1327L = cast(1 as bigint))) >> MetastoreRelation default, omnitureweb_log, None >> >> == Analyzed Logical Plan == >> count: bigint >> Aggregate [count(1) AS count#1804L] >> Project [user#0,programme_key#515] >> Join Inner, Some((programme_key#515 = programme_key#1802)) >> Subquery left_table >> Subquery omniture >> Project [scv_id#4 AS user#0,programme_key#515] >> Filter (((hit_month#2 IN (2015-11) && hit_day#3 IN (cast(20 as >> string))) && (is_logged_in#8L = cast(1 as bigint))) && >> (is_4od_video_view#40L = cast(1 as bigint))) >> MetastoreRelation default, omnitureweb_log, None >> Subquery right_table >> Subquery omniture >> Project [scv_id#1291 AS user#771,programme_key#1802] >> Filter (((hit_month#1289 IN (2015-11) && hit_day#1290 IN (cast(20 as >> string))) && (is_logged_in#1295L = cast(1 as bigint))) && >> (is_4od_video_view#1327L = cast(1 as bigint))) >> MetastoreRelation default, omnitureweb_log, None >> >> == Optimized Logical Plan == >> Aggregate [count(1) AS count#1804L] >> Project >> Join Inner, Some((programme_key#515 = programme_key#1802)) >> Project [programme_key#515] >> InMemoryRelation [user#0,programme_key#515], true, 10000, >> StorageLevel(true, true, false, true, 1), (Project [scv_id#4 AS >> user#0,programme_key#515]), None >> Project [programme_key#1802] >> Filter (((hit_month#1289 IN (2015-11) && hit_day#1290 IN (20)) && >> (is_logged_in#1295L = 1)) && (is_4od_video_view#1327L = 1)) >> MetastoreRelation default, omnitureweb_log, None >> >> == Physical Plan == >> TungstenAggregate(key=[], >> functions=[(count(1),mode=Final,isDistinct=false)], output=[count#1804L]) >> TungstenExchange SinglePartition >> TungstenAggregate(key=[], >> functions=[(count(1),mode=Partial,isDistinct=false)], >> output=[currentCount#1817L]) >> TungstenProject >> SortMergeJoin [programme_key#515], [programme_key#1802] >> TungstenSort [programme_key#515 ASC], false, 0 >> TungstenExchange hashpartitioning(programme_key#515) >> ConvertToUnsafe >> InMemoryColumnarTableScan [programme_key#515], (InMemoryRelation >> [user#0,programme_key#515], true, 10000, StorageLevel(true, true, false, >> true, 1), (Project [scv_id#4 AS user#0,programme_key#515]), None) >> TungstenSort [programme_key#1802 ASC], false, 0 >> TungstenExchange hashpartitioning(programme_key#1802) >> ConvertToUnsafe >> Project [programme_key#1802] >> Filter ((is_logged_in#1295L = 1) && (is_4od_video_view#1327L = 1)) >> HiveTableScan >> [programme_key#1802,is_logged_in#1295L,is_4od_video_view#1327L], >> (MetastoreRelation default, omnitureweb_log, None), [hit_month#1289 IN >> (2015-11),hit_day#1290 IN (20)] >> >> Code Generation: true >> >> >> >> Regards, >> Gourav >> >> On Fri, Dec 18, 2015 at 8:55 AM, Gourav Sengupta < >> gourav.sengu...@gmail.com> wrote: >> >>> hi, >>> >>> I think that people have reported the same issue elsewhere, and this >>> should be registered as a bug in SPARK >>> >>> https://forums.databricks.com/questions/2142/self-join-in-spark-sql.html >>> >>> >>> Regards, >>> Gourav >>> >>> On Thu, Dec 17, 2015 at 10:52 AM, Gourav Sengupta < >>> gourav.sengu...@gmail.com> wrote: >>> >>>> Hi Ted, >>>> >>>> The self join works fine on tbales where the hivecontext tables are >>>> direct hive tables, therefore >>>> >>>> table1 = hiveContext.sql("select columnA, columnB from hivetable1") >>>> table1.registerTempTable("table1") >>>> table1.cache() >>>> table1.count() >>>> >>>> and if I do a self join on table1 things are quite fine >>>> >>>> But in case we have something like this: >>>> table1 = hiveContext.sql("select columnA, columnB from hivetable1") >>>> table1.registerTempTable("table1") >>>> table1.cache() >>>> table1.count() >>>> >>>> table2 = hiveContext.sql("select columnA, columnB from hivetable2") >>>> table2.registerTempTable("table2") >>>> table2.cache() >>>> table2.count() >>>> >>>> table3 = hiveContext.sql("select table1.* from table1 table2 where >>>> table1.columnA = table2.columnA") >>>> table3.registerTempTable("table3") >>>> table3.cache() >>>> table3.count() >>>> >>>> >>>> then the self join on table3 does not take data from table3 cache, >>>> neither from table1 or table2 cache but starts taking data directly from S3 >>>> - which as you would understand does not make any sense. >>>> >>>> >>>> Regards, >>>> Gourav >>>> >>>> >>>> >>>> >>>> >>>> On Wed, Dec 16, 2015 at 7:16 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>>> >>>>> I did the following exercise in spark-shell ("c" is cached table): >>>>> >>>>> scala> sqlContext.sql("select x.b from c x join c y on x.a = >>>>> y.a").explain >>>>> == Physical Plan == >>>>> Project [b#4] >>>>> +- BroadcastHashJoin [a#3], [a#125], BuildRight >>>>> :- InMemoryColumnarTableScan [b#4,a#3], InMemoryRelation >>>>> [a#3,b#4,c#5], true, 10000, StorageLevel(true, true, false, true, 1), >>>>> ConvertToUnsafe, Some(c) >>>>> +- InMemoryColumnarTableScan [a#125], InMemoryRelation >>>>> [a#125,b#126,c#127], true, 10000, StorageLevel(true, true, false, true, >>>>> 1), >>>>> ConvertToUnsafe, Some(c) >>>>> >>>>> sqlContext.sql("select x.b, y.c from c x join c y on x.a = >>>>> y.a").registerTempTable("d") >>>>> scala> sqlContext.cacheTable("d") >>>>> >>>>> scala> sqlContext.sql("select x.b from d x join d y on x.c = >>>>> y.c").explain >>>>> == Physical Plan == >>>>> Project [b#4] >>>>> +- SortMergeJoin [c#90], [c#253] >>>>> :- Sort [c#90 ASC], false, 0 >>>>> : +- TungstenExchange hashpartitioning(c#90,200), None >>>>> : +- InMemoryColumnarTableScan [b#4,c#90], InMemoryRelation >>>>> [b#4,c#90], true, 10000, StorageLevel(true, true, false, true, 1), Project >>>>> [b#4,c#90], Some(d) >>>>> +- Sort [c#253 ASC], false, 0 >>>>> +- TungstenExchange hashpartitioning(c#253,200), None >>>>> +- InMemoryColumnarTableScan [c#253], InMemoryRelation >>>>> [b#246,c#253], true, 10000, StorageLevel(true, true, false, true, 1), >>>>> Project [b#4,c#90], Some(d) >>>>> >>>>> Is the above what you observed ? >>>>> >>>>> Cheers >>>>> >>>>> On Wed, Dec 16, 2015 at 9:34 AM, Gourav Sengupta < >>>>> gourav.sengu...@gmail.com> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> This is how the data can be created: >>>>>> >>>>>> 1. TableA : cached() >>>>>> 2. TableB : cached() >>>>>> 3. TableC: TableA inner join TableB cached() >>>>>> 4. TableC join TableC does not take the data from cache but starts >>>>>> reading the data for TableA and TableB from disk. >>>>>> >>>>>> Does this sound like a bug? The self join between TableA and TableB >>>>>> are working fine and taking data from cache. >>>>>> >>>>>> >>>>>> Regards, >>>>>> Gourav >>>>>> >>>>> >>>>> >>>> >>> >> >