Re: Mapper side join with DataFrames API

2016-03-05 Thread Deepak Gopalakrishnan
Hello Guys,

No help yet. Can someone tell me with a reply to the above question in SO ?

Thanks
Deepak

On Fri, Mar 4, 2016 at 5:32 PM, Deepak Gopalakrishnan 
wrote:

> Have added this to SO, can you guys share any thoughts ?
>
>
> http://stackoverflow.com/questions/35795518/spark-1-6-spills-to-disk-even-when-there-is-enough-memory
> <http://www.google.com/url?q=http%3A%2F%2Fstackoverflow.com%2Fquestions%2F35795518%2Fspark-1-6-spills-to-disk-even-when-there-is-enough-memory&sa=D&sntz=1&usg=AFQjCNEzDJqylz5aF0998u08RGlf5YF1-g>
>
> On Thu, Mar 3, 2016 at 7:06 AM, Deepak Gopalakrishnan 
> wrote:
>
>> Hello,
>>
>> I'm using 1.6.0 on EMR
>>
>> On Thu, Mar 3, 2016 at 12:34 AM, Yong Zhang  wrote:
>>
>>> What version of Spark you are using?
>>>
>>> I am also trying to figure out how to do the map side join in Spark.
>>>
>>> In 1.5.x, there is a broadcast function in the Dataframe, and it caused
>>> OOM for me simple test case, even one side of join is very small.
>>>
>>> I am still trying to find out the root cause yet.
>>>
>>> Yong
>>>
>>> --
>>> Date: Wed, 2 Mar 2016 15:38:29 +0530
>>> Subject: Re: Mapper side join with DataFrames API
>>> From: dgk...@gmail.com
>>> To: mich...@databricks.com
>>> CC: user@spark.apache.org
>>>
>>>
>>> Thanks for the help guys.
>>>
>>> Just to ask a part of my question in a little different way.
>>>
>>> I have attached my screenshots here. There is so much of memory that is
>>> unused and yet there is a spill ( as in screenshots). Any idea why ?
>>>
>>> Thanks
>>> Deepak
>>>
>>> On Wed, Mar 2, 2016 at 5:14 AM, Michael Armbrust >> > wrote:
>>>
>>> Its helpful to always include the output of df.explain(true) when you
>>> are asking about performance.
>>>
>>> On Mon, Feb 29, 2016 at 6:14 PM, Deepak Gopalakrishnan >> > wrote:
>>>
>>> Hello All,
>>>
>>> I'm trying to join 2 dataframes A and B with a
>>>
>>> sqlContext.sql("SELECT * FROM A INNER JOIN B ON A.a=B.a");
>>>
>>> Now what I have done is that I have registeredTempTables for A and B
>>> after loading these DataFrames from different sources. I need the join to
>>> be really fast and I was wondering if there is a way to use the SQL
>>> statement and then being able to do a mapper side join ( say my table B is
>>> small) ?
>>>
>>> I read some articles on using broadcast to do mapper side joins. Could I
>>> do something like this and then execute my sql statement to achieve mapper
>>> side join ?
>>>
>>> DataFrame B = sparkContext.broadcast(B);
>>> B.registerTempTable("B");
>>>
>>>
>>> I have a join as stated above and I see in my executor logs the below :
>>>
>>> 16/02/29 17:02:35 INFO TaskSetManager: Finished task 198.0 in stage 7.0
>>> (TID 1114) in 20354 ms on localhost (196/200)
>>> 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Getting 200
>>> non-empty blocks out of 200 blocks
>>> 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Started 0 remote
>>> fetches in 0 ms
>>> 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty
>>> blocks out of 128 blocks
>>> 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Started 0 remote
>>> fetches in 0 ms
>>> 16/02/29 17:03:03 INFO Executor: Finished task 199.0 in stage 7.0 (TID
>>> 1115). 2511 bytes result sent to driver
>>> 16/02/29 17:03:03 INFO TaskSetManager: Finished task 199.0 in stage 7.0
>>> (TID 1115) in 27621 ms on localhost (197/200)
>>>
>>> *16/02/29 17:07:06 INFO UnsafeExternalSorter: Thread 124 spilling sort
>>> data of 256.0 KB to disk (0  time so far)*
>>>
>>>
>>> Now, I have around 10G of executor memory and my memory faction should
>>> be the default ( 0.75 as per the documentation) and my memory usage is <
>>> 1.5G( obtained from the Storage tab on Spark dashboard), but still it says
>>> spilling sort data. I'm a little surprised why this happens even when I
>>> have enough memory free.
>>> Any inputs will be greatly appreciated!
>>> Thanks
>>> --
>>> Regards,
>>> *Deepak Gopalakrishnan*
>>> *Mobile*:+918891509774
>>> *Skype* : deepakgk87
>>> http://myexps.blogspot.com
>>>
>>>
>>>
>>>
>>>
>>> --
>>> Regards,
>>> *Deepak Gopalakrishnan*
>>> *Mobile*:+918891509774
>>> *Skype* : deepakgk87
>>> http://myexps.blogspot.com
>>>
>>>
>>> - To
>>> unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
>>> commands, e-mail: user-h...@spark.apache.org
>>>
>>
>>
>>
>> --
>> Regards,
>> *Deepak Gopalakrishnan*
>> *Mobile*:+918891509774
>> *Skype* : deepakgk87
>> http://myexps.blogspot.com
>>
>>
>
>
> --
> Regards,
> *Deepak Gopalakrishnan*
> *Mobile*:+918891509774
> *Skype* : deepakgk87
> http://myexps.blogspot.com
>
>


-- 
Regards,
*Deepak Gopalakrishnan*
*Mobile*:+918891509774
*Skype* : deepakgk87
http://myexps.blogspot.com


Re: Mapper side join with DataFrames API

2016-03-04 Thread Deepak Gopalakrishnan
Have added this to SO, can you guys share any thoughts ?

http://stackoverflow.com/questions/35795518/spark-1-6-spills-to-disk-even-when-there-is-enough-memory
<http://www.google.com/url?q=http%3A%2F%2Fstackoverflow.com%2Fquestions%2F35795518%2Fspark-1-6-spills-to-disk-even-when-there-is-enough-memory&sa=D&sntz=1&usg=AFQjCNEzDJqylz5aF0998u08RGlf5YF1-g>

On Thu, Mar 3, 2016 at 7:06 AM, Deepak Gopalakrishnan 
wrote:

> Hello,
>
> I'm using 1.6.0 on EMR
>
> On Thu, Mar 3, 2016 at 12:34 AM, Yong Zhang  wrote:
>
>> What version of Spark you are using?
>>
>> I am also trying to figure out how to do the map side join in Spark.
>>
>> In 1.5.x, there is a broadcast function in the Dataframe, and it caused
>> OOM for me simple test case, even one side of join is very small.
>>
>> I am still trying to find out the root cause yet.
>>
>> Yong
>>
>> ------
>> Date: Wed, 2 Mar 2016 15:38:29 +0530
>> Subject: Re: Mapper side join with DataFrames API
>> From: dgk...@gmail.com
>> To: mich...@databricks.com
>> CC: user@spark.apache.org
>>
>>
>> Thanks for the help guys.
>>
>> Just to ask a part of my question in a little different way.
>>
>> I have attached my screenshots here. There is so much of memory that is
>> unused and yet there is a spill ( as in screenshots). Any idea why ?
>>
>> Thanks
>> Deepak
>>
>> On Wed, Mar 2, 2016 at 5:14 AM, Michael Armbrust 
>> wrote:
>>
>> Its helpful to always include the output of df.explain(true) when you
>> are asking about performance.
>>
>> On Mon, Feb 29, 2016 at 6:14 PM, Deepak Gopalakrishnan 
>> wrote:
>>
>> Hello All,
>>
>> I'm trying to join 2 dataframes A and B with a
>>
>> sqlContext.sql("SELECT * FROM A INNER JOIN B ON A.a=B.a");
>>
>> Now what I have done is that I have registeredTempTables for A and B
>> after loading these DataFrames from different sources. I need the join to
>> be really fast and I was wondering if there is a way to use the SQL
>> statement and then being able to do a mapper side join ( say my table B is
>> small) ?
>>
>> I read some articles on using broadcast to do mapper side joins. Could I
>> do something like this and then execute my sql statement to achieve mapper
>> side join ?
>>
>> DataFrame B = sparkContext.broadcast(B);
>> B.registerTempTable("B");
>>
>>
>> I have a join as stated above and I see in my executor logs the below :
>>
>> 16/02/29 17:02:35 INFO TaskSetManager: Finished task 198.0 in stage 7.0
>> (TID 1114) in 20354 ms on localhost (196/200)
>> 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Getting 200 non-empty
>> blocks out of 200 blocks
>> 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Started 0 remote
>> fetches in 0 ms
>> 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty
>> blocks out of 128 blocks
>> 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Started 0 remote
>> fetches in 0 ms
>> 16/02/29 17:03:03 INFO Executor: Finished task 199.0 in stage 7.0 (TID
>> 1115). 2511 bytes result sent to driver
>> 16/02/29 17:03:03 INFO TaskSetManager: Finished task 199.0 in stage 7.0
>> (TID 1115) in 27621 ms on localhost (197/200)
>>
>> *16/02/29 17:07:06 INFO UnsafeExternalSorter: Thread 124 spilling sort
>> data of 256.0 KB to disk (0  time so far)*
>>
>>
>> Now, I have around 10G of executor memory and my memory faction should be
>> the default ( 0.75 as per the documentation) and my memory usage is < 1.5G(
>> obtained from the Storage tab on Spark dashboard), but still it says
>> spilling sort data. I'm a little surprised why this happens even when I
>> have enough memory free.
>> Any inputs will be greatly appreciated!
>> Thanks
>> --
>> Regards,
>> *Deepak Gopalakrishnan*
>> *Mobile*:+918891509774
>> *Skype* : deepakgk87
>> http://myexps.blogspot.com
>>
>>
>>
>>
>>
>> --
>> Regards,
>> *Deepak Gopalakrishnan*
>> *Mobile*:+918891509774
>> *Skype* : deepakgk87
>> http://myexps.blogspot.com
>>
>>
>> - To
>> unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
>> commands, e-mail: user-h...@spark.apache.org
>>
>
>
>
> --
> Regards,
> *Deepak Gopalakrishnan*
> *Mobile*:+918891509774
> *Skype* : deepakgk87
> http://myexps.blogspot.com
>
>


-- 
Regards,
*Deepak Gopalakrishnan*
*Mobile*:+918891509774
*Skype* : deepakgk87
http://myexps.blogspot.com


Re: Mapper side join with DataFrames API

2016-03-02 Thread Deepak Gopalakrishnan
Hello,

I'm using 1.6.0 on EMR

On Thu, Mar 3, 2016 at 12:34 AM, Yong Zhang  wrote:

> What version of Spark you are using?
>
> I am also trying to figure out how to do the map side join in Spark.
>
> In 1.5.x, there is a broadcast function in the Dataframe, and it caused
> OOM for me simple test case, even one side of join is very small.
>
> I am still trying to find out the root cause yet.
>
> Yong
>
> --
> Date: Wed, 2 Mar 2016 15:38:29 +0530
> Subject: Re: Mapper side join with DataFrames API
> From: dgk...@gmail.com
> To: mich...@databricks.com
> CC: user@spark.apache.org
>
>
> Thanks for the help guys.
>
> Just to ask a part of my question in a little different way.
>
> I have attached my screenshots here. There is so much of memory that is
> unused and yet there is a spill ( as in screenshots). Any idea why ?
>
> Thanks
> Deepak
>
> On Wed, Mar 2, 2016 at 5:14 AM, Michael Armbrust 
> wrote:
>
> Its helpful to always include the output of df.explain(true) when you are
> asking about performance.
>
> On Mon, Feb 29, 2016 at 6:14 PM, Deepak Gopalakrishnan 
> wrote:
>
> Hello All,
>
> I'm trying to join 2 dataframes A and B with a
>
> sqlContext.sql("SELECT * FROM A INNER JOIN B ON A.a=B.a");
>
> Now what I have done is that I have registeredTempTables for A and B after
> loading these DataFrames from different sources. I need the join to be
> really fast and I was wondering if there is a way to use the SQL statement
> and then being able to do a mapper side join ( say my table B is small) ?
>
> I read some articles on using broadcast to do mapper side joins. Could I
> do something like this and then execute my sql statement to achieve mapper
> side join ?
>
> DataFrame B = sparkContext.broadcast(B);
> B.registerTempTable("B");
>
>
> I have a join as stated above and I see in my executor logs the below :
>
> 16/02/29 17:02:35 INFO TaskSetManager: Finished task 198.0 in stage 7.0
> (TID 1114) in 20354 ms on localhost (196/200)
> 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Getting 200 non-empty
> blocks out of 200 blocks
> 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Started 0 remote
> fetches in 0 ms
> 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty
> blocks out of 128 blocks
> 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Started 0 remote
> fetches in 0 ms
> 16/02/29 17:03:03 INFO Executor: Finished task 199.0 in stage 7.0 (TID
> 1115). 2511 bytes result sent to driver
> 16/02/29 17:03:03 INFO TaskSetManager: Finished task 199.0 in stage 7.0
> (TID 1115) in 27621 ms on localhost (197/200)
>
> *16/02/29 17:07:06 INFO UnsafeExternalSorter: Thread 124 spilling sort
> data of 256.0 KB to disk (0  time so far)*
>
>
> Now, I have around 10G of executor memory and my memory faction should be
> the default ( 0.75 as per the documentation) and my memory usage is < 1.5G(
> obtained from the Storage tab on Spark dashboard), but still it says
> spilling sort data. I'm a little surprised why this happens even when I
> have enough memory free.
> Any inputs will be greatly appreciated!
> Thanks
> --
> Regards,
> *Deepak Gopalakrishnan*
> *Mobile*:+918891509774
> *Skype* : deepakgk87
> http://myexps.blogspot.com
>
>
>
>
>
> --
> Regards,
> *Deepak Gopalakrishnan*
> *Mobile*:+918891509774
> *Skype* : deepakgk87
> http://myexps.blogspot.com
>
>
> - To
> unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
> commands, e-mail: user-h...@spark.apache.org
>



-- 
Regards,
*Deepak Gopalakrishnan*
*Mobile*:+918891509774
*Skype* : deepakgk87
http://myexps.blogspot.com


Re: Mapper side join with DataFrames API

2016-03-01 Thread Michael Armbrust
Its helpful to always include the output of df.explain(true) when you are
asking about performance.

On Mon, Feb 29, 2016 at 6:14 PM, Deepak Gopalakrishnan 
wrote:

> Hello All,
>
> I'm trying to join 2 dataframes A and B with a
>
> sqlContext.sql("SELECT * FROM A INNER JOIN B ON A.a=B.a");
>
> Now what I have done is that I have registeredTempTables for A and B after
> loading these DataFrames from different sources. I need the join to be
> really fast and I was wondering if there is a way to use the SQL statement
> and then being able to do a mapper side join ( say my table B is small) ?
>
> I read some articles on using broadcast to do mapper side joins. Could I
> do something like this and then execute my sql statement to achieve mapper
> side join ?
>
> DataFrame B = sparkContext.broadcast(B);
> B.registerTempTable("B");
>
>
> I have a join as stated above and I see in my executor logs the below :
>
> 16/02/29 17:02:35 INFO TaskSetManager: Finished task 198.0 in stage 7.0
> (TID 1114) in 20354 ms on localhost (196/200)
>
> 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Getting 200 non-empty
> blocks out of 200 blocks
>
> 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Started 0 remote
> fetches in 0 ms
>
> 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty
> blocks out of 128 blocks
>
> 16/02/29 17:02:35 INFO ShuffleBlockFetcherIterator: Started 0 remote
> fetches in 0 ms
>
> 16/02/29 17:03:03 INFO Executor: Finished task 199.0 in stage 7.0 (TID
> 1115). 2511 bytes result sent to driver
>
> 16/02/29 17:03:03 INFO TaskSetManager: Finished task 199.0 in stage 7.0
> (TID 1115) in 27621 ms on localhost (197/200)
>
> *16/02/29 17:07:06 INFO UnsafeExternalSorter: Thread 124 spilling sort
> data of 256.0 KB to disk (0  time so far)*
>
>
> Now, I have around 10G of executor memory and my memory faction should be
> the default ( 0.75 as per the documentation) and my memory usage is < 1.5G(
> obtained from the Storage tab on Spark dashboard), but still it says
> spilling sort data. I'm a little surprised why this happens even when I
> have enough memory free.
>
> Any inputs will be greatly appreciated!
>
> Thanks
> --
> Regards,
> *Deepak Gopalakrishnan*
> *Mobile*:+918891509774
> *Skype* : deepakgk87
> http://myexps.blogspot.com
>
>