Re: Mapper side join with DataFrames API
Hello Guys, No help yet. Can someone tell me with a reply to the above question in SO ? Thanks Deepak On Fri, Mar 4, 2016 at 5:32 PM, Deepak Gopalakrishnan wrote: > Have added this to SO, can you guys share any thoughts ? > > > http://stackoverflow.com/questions/35795518/spark-1-6-spills-to-disk-even-when-there-is-enough-memory > <http://www.google.com/url?q=http%3A%2F%2Fstackoverflow.com%2Fquestions%2F35795518%2Fspark-1-6-spills-to-disk-even-when-there-is-enough-memory&sa=D&sntz=1&usg=AFQjCNEzDJqylz5aF0998u08RGlf5YF1-g> > > On Thu, Mar 3, 2016 at 7:06 AM, Deepak Gopalakrishnan > wrote: > >> Hello, >> >> I'm using 1.6.0 on EMR >> >> On Thu, Mar 3, 2016 at 12:34 AM, Yong Zhang wrote: >> >>> What version of Spark you are using? >>> >>> I am also trying to figure out how to do the map side join in Spark. >>> >>> In 1.5.x, there is a broadcast function in the Dataframe, and it caused >>> OOM for me simple test case, even one side of join is very small. >>> >>> I am still trying to find out the root cause yet. >>> >>> Yong >>> >>> -- >>> Date: Wed, 2 Mar 2016 15:38:29 +0530 >>> Subject: Re: Mapper side join with DataFrames API >>> From: dgk...@gmail.com >>> To: mich...@databricks.com >>> CC: user@spark.apache.org >>> >>> >>> Thanks for the help guys. >>> >>> Just to ask a part of my question in a little different way. >>> >>> I have attached my screenshots here. There is so much of memory that is >>> unused and yet there is a spill ( as in screenshots). Any idea why ? >>> >>> Thanks >>> Deepak >>> >>> On Wed, Mar 2, 2016 at 5:14 AM, Michael Armbrust >> > wrote: >>> >>> Its helpful to always include the output of df.explain(true) when you >>> are asking about performance. >>> >>> On Mon, Feb 29, 2016 at 6:14 PM, Deepak Gopalakrishnan >> > wrote: >>> >>> Hello All, >>> >>> I'm trying to join 2 dataframes A and B with a >>> >>> sqlContext.sql("SELECT * FROM A INNER JOIN B ON A.a=B.a"); >>> >>> Now what I have done is that I have registeredTempTables for A and B >>> after loading these DataFrames from different sources. I need the join to >>> be really fast and I was wondering if there is a way to use the SQL >>> statement and then being able to do a mapper side join ( say my table B is >>> small) ? >>> >>> I read some articles on using broadcast to do mapper side joins. Could I >>> do something like this and then execute my sql statement to achieve mapper >>> side join ? >>> >>> DataFrame B = sparkContext.broadcast(B); >>> B.registerTempTable("B"); >>> >>> >>> I have a join as stated above and I see in my executor logs the below : >>> >>> 16/02/29 17:02:35 INFO TaskSetManager: Finished task 198.0 in stage 7.0 >>> (TID 1114) in 20354 ms on localhost (196/200) >>> 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Getting 200 >>> non-empty blocks out of 200 blocks >>> 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Started 0 remote >>> fetches in 0 ms >>> 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty >>> blocks out of 128 blocks >>> 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Started 0 remote >>> fetches in 0 ms >>> 16/02/29 17:03:03 INFO Executor: Finished task 199.0 in stage 7.0 (TID >>> 1115). 2511 bytes result sent to driver >>> 16/02/29 17:03:03 INFO TaskSetManager: Finished task 199.0 in stage 7.0 >>> (TID 1115) in 27621 ms on localhost (197/200) >>> >>> *16/02/29 17:07:06 INFO UnsafeExternalSorter: Thread 124 spilling sort >>> data of 256.0 KB to disk (0 time so far)* >>> >>> >>> Now, I have around 10G of executor memory and my memory faction should >>> be the default ( 0.75 as per the documentation) and my memory usage is < >>> 1.5G( obtained from the Storage tab on Spark dashboard), but still it says >>> spilling sort data. I'm a little surprised why this happens even when I >>> have enough memory free. >>> Any inputs will be greatly appreciated! >>> Thanks >>> -- >>> Regards, >>> *Deepak Gopalakrishnan* >>> *Mobile*:+918891509774 >>> *Skype* : deepakgk87 >>> http://myexps.blogspot.com >>> >>> >>> >>> >>> >>> -- >>> Regards, >>> *Deepak Gopalakrishnan* >>> *Mobile*:+918891509774 >>> *Skype* : deepakgk87 >>> http://myexps.blogspot.com >>> >>> >>> - To >>> unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional >>> commands, e-mail: user-h...@spark.apache.org >>> >> >> >> >> -- >> Regards, >> *Deepak Gopalakrishnan* >> *Mobile*:+918891509774 >> *Skype* : deepakgk87 >> http://myexps.blogspot.com >> >> > > > -- > Regards, > *Deepak Gopalakrishnan* > *Mobile*:+918891509774 > *Skype* : deepakgk87 > http://myexps.blogspot.com > > -- Regards, *Deepak Gopalakrishnan* *Mobile*:+918891509774 *Skype* : deepakgk87 http://myexps.blogspot.com
Re: Mapper side join with DataFrames API
Have added this to SO, can you guys share any thoughts ? http://stackoverflow.com/questions/35795518/spark-1-6-spills-to-disk-even-when-there-is-enough-memory <http://www.google.com/url?q=http%3A%2F%2Fstackoverflow.com%2Fquestions%2F35795518%2Fspark-1-6-spills-to-disk-even-when-there-is-enough-memory&sa=D&sntz=1&usg=AFQjCNEzDJqylz5aF0998u08RGlf5YF1-g> On Thu, Mar 3, 2016 at 7:06 AM, Deepak Gopalakrishnan wrote: > Hello, > > I'm using 1.6.0 on EMR > > On Thu, Mar 3, 2016 at 12:34 AM, Yong Zhang wrote: > >> What version of Spark you are using? >> >> I am also trying to figure out how to do the map side join in Spark. >> >> In 1.5.x, there is a broadcast function in the Dataframe, and it caused >> OOM for me simple test case, even one side of join is very small. >> >> I am still trying to find out the root cause yet. >> >> Yong >> >> ------ >> Date: Wed, 2 Mar 2016 15:38:29 +0530 >> Subject: Re: Mapper side join with DataFrames API >> From: dgk...@gmail.com >> To: mich...@databricks.com >> CC: user@spark.apache.org >> >> >> Thanks for the help guys. >> >> Just to ask a part of my question in a little different way. >> >> I have attached my screenshots here. There is so much of memory that is >> unused and yet there is a spill ( as in screenshots). Any idea why ? >> >> Thanks >> Deepak >> >> On Wed, Mar 2, 2016 at 5:14 AM, Michael Armbrust >> wrote: >> >> Its helpful to always include the output of df.explain(true) when you >> are asking about performance. >> >> On Mon, Feb 29, 2016 at 6:14 PM, Deepak Gopalakrishnan >> wrote: >> >> Hello All, >> >> I'm trying to join 2 dataframes A and B with a >> >> sqlContext.sql("SELECT * FROM A INNER JOIN B ON A.a=B.a"); >> >> Now what I have done is that I have registeredTempTables for A and B >> after loading these DataFrames from different sources. I need the join to >> be really fast and I was wondering if there is a way to use the SQL >> statement and then being able to do a mapper side join ( say my table B is >> small) ? >> >> I read some articles on using broadcast to do mapper side joins. Could I >> do something like this and then execute my sql statement to achieve mapper >> side join ? >> >> DataFrame B = sparkContext.broadcast(B); >> B.registerTempTable("B"); >> >> >> I have a join as stated above and I see in my executor logs the below : >> >> 16/02/29 17:02:35 INFO TaskSetManager: Finished task 198.0 in stage 7.0 >> (TID 1114) in 20354 ms on localhost (196/200) >> 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Getting 200 non-empty >> blocks out of 200 blocks >> 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Started 0 remote >> fetches in 0 ms >> 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty >> blocks out of 128 blocks >> 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Started 0 remote >> fetches in 0 ms >> 16/02/29 17:03:03 INFO Executor: Finished task 199.0 in stage 7.0 (TID >> 1115). 2511 bytes result sent to driver >> 16/02/29 17:03:03 INFO TaskSetManager: Finished task 199.0 in stage 7.0 >> (TID 1115) in 27621 ms on localhost (197/200) >> >> *16/02/29 17:07:06 INFO UnsafeExternalSorter: Thread 124 spilling sort >> data of 256.0 KB to disk (0 time so far)* >> >> >> Now, I have around 10G of executor memory and my memory faction should be >> the default ( 0.75 as per the documentation) and my memory usage is < 1.5G( >> obtained from the Storage tab on Spark dashboard), but still it says >> spilling sort data. I'm a little surprised why this happens even when I >> have enough memory free. >> Any inputs will be greatly appreciated! >> Thanks >> -- >> Regards, >> *Deepak Gopalakrishnan* >> *Mobile*:+918891509774 >> *Skype* : deepakgk87 >> http://myexps.blogspot.com >> >> >> >> >> >> -- >> Regards, >> *Deepak Gopalakrishnan* >> *Mobile*:+918891509774 >> *Skype* : deepakgk87 >> http://myexps.blogspot.com >> >> >> - To >> unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional >> commands, e-mail: user-h...@spark.apache.org >> > > > > -- > Regards, > *Deepak Gopalakrishnan* > *Mobile*:+918891509774 > *Skype* : deepakgk87 > http://myexps.blogspot.com > > -- Regards, *Deepak Gopalakrishnan* *Mobile*:+918891509774 *Skype* : deepakgk87 http://myexps.blogspot.com
Re: Mapper side join with DataFrames API
Hello, I'm using 1.6.0 on EMR On Thu, Mar 3, 2016 at 12:34 AM, Yong Zhang wrote: > What version of Spark you are using? > > I am also trying to figure out how to do the map side join in Spark. > > In 1.5.x, there is a broadcast function in the Dataframe, and it caused > OOM for me simple test case, even one side of join is very small. > > I am still trying to find out the root cause yet. > > Yong > > -- > Date: Wed, 2 Mar 2016 15:38:29 +0530 > Subject: Re: Mapper side join with DataFrames API > From: dgk...@gmail.com > To: mich...@databricks.com > CC: user@spark.apache.org > > > Thanks for the help guys. > > Just to ask a part of my question in a little different way. > > I have attached my screenshots here. There is so much of memory that is > unused and yet there is a spill ( as in screenshots). Any idea why ? > > Thanks > Deepak > > On Wed, Mar 2, 2016 at 5:14 AM, Michael Armbrust > wrote: > > Its helpful to always include the output of df.explain(true) when you are > asking about performance. > > On Mon, Feb 29, 2016 at 6:14 PM, Deepak Gopalakrishnan > wrote: > > Hello All, > > I'm trying to join 2 dataframes A and B with a > > sqlContext.sql("SELECT * FROM A INNER JOIN B ON A.a=B.a"); > > Now what I have done is that I have registeredTempTables for A and B after > loading these DataFrames from different sources. I need the join to be > really fast and I was wondering if there is a way to use the SQL statement > and then being able to do a mapper side join ( say my table B is small) ? > > I read some articles on using broadcast to do mapper side joins. Could I > do something like this and then execute my sql statement to achieve mapper > side join ? > > DataFrame B = sparkContext.broadcast(B); > B.registerTempTable("B"); > > > I have a join as stated above and I see in my executor logs the below : > > 16/02/29 17:02:35 INFO TaskSetManager: Finished task 198.0 in stage 7.0 > (TID 1114) in 20354 ms on localhost (196/200) > 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Getting 200 non-empty > blocks out of 200 blocks > 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Started 0 remote > fetches in 0 ms > 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty > blocks out of 128 blocks > 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Started 0 remote > fetches in 0 ms > 16/02/29 17:03:03 INFO Executor: Finished task 199.0 in stage 7.0 (TID > 1115). 2511 bytes result sent to driver > 16/02/29 17:03:03 INFO TaskSetManager: Finished task 199.0 in stage 7.0 > (TID 1115) in 27621 ms on localhost (197/200) > > *16/02/29 17:07:06 INFO UnsafeExternalSorter: Thread 124 spilling sort > data of 256.0 KB to disk (0 time so far)* > > > Now, I have around 10G of executor memory and my memory faction should be > the default ( 0.75 as per the documentation) and my memory usage is < 1.5G( > obtained from the Storage tab on Spark dashboard), but still it says > spilling sort data. I'm a little surprised why this happens even when I > have enough memory free. > Any inputs will be greatly appreciated! > Thanks > -- > Regards, > *Deepak Gopalakrishnan* > *Mobile*:+918891509774 > *Skype* : deepakgk87 > http://myexps.blogspot.com > > > > > > -- > Regards, > *Deepak Gopalakrishnan* > *Mobile*:+918891509774 > *Skype* : deepakgk87 > http://myexps.blogspot.com > > > - To > unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional > commands, e-mail: user-h...@spark.apache.org > -- Regards, *Deepak Gopalakrishnan* *Mobile*:+918891509774 *Skype* : deepakgk87 http://myexps.blogspot.com
Re: Mapper side join with DataFrames API
Its helpful to always include the output of df.explain(true) when you are asking about performance. On Mon, Feb 29, 2016 at 6:14 PM, Deepak Gopalakrishnan wrote: > Hello All, > > I'm trying to join 2 dataframes A and B with a > > sqlContext.sql("SELECT * FROM A INNER JOIN B ON A.a=B.a"); > > Now what I have done is that I have registeredTempTables for A and B after > loading these DataFrames from different sources. I need the join to be > really fast and I was wondering if there is a way to use the SQL statement > and then being able to do a mapper side join ( say my table B is small) ? > > I read some articles on using broadcast to do mapper side joins. Could I > do something like this and then execute my sql statement to achieve mapper > side join ? > > DataFrame B = sparkContext.broadcast(B); > B.registerTempTable("B"); > > > I have a join as stated above and I see in my executor logs the below : > > 16/02/29 17:02:35 INFO TaskSetManager: Finished task 198.0 in stage 7.0 > (TID 1114) in 20354 ms on localhost (196/200) > > 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Getting 200 non-empty > blocks out of 200 blocks > > 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Started 0 remote > fetches in 0 ms > > 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty > blocks out of 128 blocks > > 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Started 0 remote > fetches in 0 ms > > 16/02/29 17:03:03 INFO Executor: Finished task 199.0 in stage 7.0 (TID > 1115). 2511 bytes result sent to driver > > 16/02/29 17:03:03 INFO TaskSetManager: Finished task 199.0 in stage 7.0 > (TID 1115) in 27621 ms on localhost (197/200) > > *16/02/29 17:07:06 INFO UnsafeExternalSorter: Thread 124 spilling sort > data of 256.0 KB to disk (0 time so far)* > > > Now, I have around 10G of executor memory and my memory faction should be > the default ( 0.75 as per the documentation) and my memory usage is < 1.5G( > obtained from the Storage tab on Spark dashboard), but still it says > spilling sort data. I'm a little surprised why this happens even when I > have enough memory free. > > Any inputs will be greatly appreciated! > > Thanks > -- > Regards, > *Deepak Gopalakrishnan* > *Mobile*:+918891509774 > *Skype* : deepakgk87 > http://myexps.blogspot.com > >