HI all, I have on issue about the text.
-sortMergeJoin[c1#41,c1#98] what does 41 and 98 stand for please. thanks:) ---Original--- From: "Swapnil Shinde"<swapnilushi...@gmail.com> Date: 2017/2/11 07:38:42 To: "Yong Zhang"<java8...@hotmail.com>; Cc: "user@spark.apache.org"<user@spark.apache.org>; Subject: Re: Spark's execution plan debugging Thanks for your reply. I agree to your explanation of caching and seeing that it's working as expected. I am running given snippet on spark 2.0.1 and even with caching, I can see it's going back to dataframes a & b.  On Thu, Feb 9, 2017 at 3:41 PM, Yong Zhang <java8...@hotmail.com> wrote: You may misunderstand what the cache mean. Caching a DF just means the data can be retrieved from the memory directly, instead of going to parent dependency to get the data. In your example, even the C is cached, but if you have 2 DFs derived out from it, then the DF of C will be scanned 2 times in your application, but they are retrieved directly from the memory, instead of going to A/B DFs, which are the parent DFs that C is derived out from. In the Spark execution plan, it can find out if any DFs in the chain being cached or not, then generate the right execution plan accordingly, as shown in following example (Tested with Spark 1.6.3). So as you can see, if the C is NOT cached, then your X has to go to A/B (Scanning existing RDDs), but after C caches, Spark will get from "InMemoryColumnarTableScan". But cache has nothing to do how many times the data will be scanned or not.  scala> x.explain == Physical Plan == SortMergeJoin [c1#41], [c1#98] :- SortMergeJoin [c1#41], [d1#45] :  :- Sort [c1#41 ASC], false, 0 :  :  +- TungstenExchange hashpartitioning(c1#41,200), None :  :     +- Project [_1#39 AS c1#41,_2#40 AS c2#42] :  :        +- Filter (_1#39 = a) :  :           +- Scan ExistingRDD[_1#39,_2#40]  :  +- Sort [d1#45 ASC], false, 0 :     +- TungstenExchange hashpartitioning(d1#45,200), None :        +- Project [_1#43 AS d1#45,_2#44 AS d2#46] :           +- Scan ExistingRDD[_1#43,_2#44]  +- SortMergeJoin [c1#98], [d1#102]    :- Sort [c1#98 ASC], false, 0    :  +- TungstenExchange hashpartitioning(c1#98,200), None    :     +- Project [_1#39 AS c1#98,_2#40 AS c2#99]    :        +- Scan ExistingRDD[_1#39,_2#40]     +- Sort [d1#102 ASC], false, 0       +- TungstenExchange hashpartitioning(d1#102,200), None          +- Project [_1#43 AS d1#102,_2#44 AS d2#103]             +- Filter (_1#43 = b)                +- Scan ExistingRDD[_1#43,_2#44] scala> c.cache res17: c.type = [c1: string, c2: int, d1: string, d2: int] scala> x.explain == Physical Plan == SortMergeJoin [c1#41], [c1#98] :- Filter (c1#41 = a) :  +- InMemoryColumnarTableScan [c1#41,c2#42,d1#45,d2#46], [(c1#41 = a)], InMemoryRelation [c1#41,c2#42,d1#45,d2#46], true, 10000, StorageLevel(true, true, false, true, 1), SortMergeJoin [c1#41], [d1#45], None +- Sort [c1#98 ASC], false, 0    +- TungstenExchange hashpartitioning(c1#98,200), None       +- Filter (d1#102 = b)          +- InMemoryColumnarTableScan [c1#98,c2#99,d1#102,d2#103], [(d1#102 = b)], InMemoryRelation [c1#98,c2#99,d1#102,d2#103], true, 10000, StorageLevel(true, true, false, true, 1), SortMergeJoin [c1#41], [d1#45], None From: Swapnil Shinde <swapnilushi...@gmail.com> Sent: Thursday, February 9, 2017 2:53 PM To: user@spark.apache.org Subject: Re: Spark's execution plan debugging   Any suggestions, please.. On Wed, Feb 8, 2017 at 12:02 PM, Swapnil Shinde <swapnilushi...@gmail.com> wrote: Hello        I am trying to figure out how spark generates its execution plan with and without caching. I have this small example to illustrate what I am doing- val a = sc.parallelize(List(("a", 1), ("b", 2), ("c",3))).toDF("c1", "c2") val b = sc.parallelize(List(("a", 1), ("b", 2), ("c",3))).toDF("d1", "d2") val c = a.join(b, $"c1" === $"d1") val d1 = c.map(x => {val c1 = x.getAs[String]("c1"); val c2 = x.getAs[Int]("c2"); (c1, c2*2)}).toDF("nC1", "nC2") val d2 = c.map(x => {val d1 = x.getAs[String]("d1"); val d2 = x.getAs[Int]("d2"); (d1, d2*3)}).toDF("nD1", "nD2") val x = d1.as("a").join(d2.as("b"), $"a.nC1" === $"b.nD1") generic DAG for dataframe 'x' would be something like this- (Fig1)         ???????? Obviously, physical plan (x.explain) generates something like this - (Without any caching) ???????? I am interpreting this as -  ???????? As per my understanding, dataframe C is being used twice so it will be good to cache to it. I am hoping if I cache 'c' then execution plan will look like generic (explained above in fig1). However, I dont see it that way. Correct me if my understanding is wrong in interpreting plan- (Here c is cached) ???????? I don't think caching 'c' is helping anyway. Basically, input dataframes 'a' & 'b' are being fetched twice. (In this example a,b are dataframes generated from local collection but real world has large files) Question:     Why caching 'c' doesn't build physical plan where 'a' & 'b' were fetched only once. Then 'c' is generated and then d1, d2 built in parallel and provides input for x. (like fig1)  I understand I am missing something very basic in understanding execution plans so please correct me if I am wrong anywhere.      Thanks Swapnil