DataFrame.toJavaRDD cause fetching data to driver, is it expected ?

2015-11-04 Thread Aliaksei Tsyvunchyk
Hello folks,

Recently I have noticed unexpectedly big network traffic between Driver Program 
and Worker node.
During debugging I have figured out that it is caused by following block of 
code 

—— Java ——— — 
DataFrame etpvRecords = context.sql(" SOME SQL query here");
Mapper m = new Mapper(localValue, ProgramId::toProgId);
return etpvRecords
.toJavaRDD()
.map(m::mapHutPutViewingRow)
.reduce(Reducer::reduce);
—— Java  

I’m using debug breakpoint and OS X nettop to monitor traffic between 
processes. So before approaching line toJavaRDD() I have 500Kb of traffic and 
after executing this line I have 2.2 Mb of traffic. But when I check size of 
result of reduce function it is 10 Kb.
So .toJavaRDD() seems causing worker process return dataset to driver process 
and seems further map/reduce occurs on Driver. 

This is definitely not expected by me, so I have 2 questions.
1.  Is it really expected behavior that DataFrame.toJavaRDD cause whole dataset 
return to driver or I’m doing something wrong?
2.  What is expected way to perform transformation with DataFrame using custom 
Java map\reduce functions in case if standard SQL features are not fit all my 
needs?

Env: Spark 1.5.1 in standalone mode, (1 master, 1 worker sharing same machine). 
Java 1.8.0_60.
-- 


CONFIDENTIALITY NOTICE: This email and files attached to it are 
confidential. If you are not the intended recipient you are hereby notified 
that using, copying, distributing or taking any action in reliance on the 
contents of this information is strictly prohibited. If you have received 
this email in error please notify the sender and delete this email.


Re: DataFrame.toJavaRDD cause fetching data to driver, is it expected ?

2015-11-04 Thread Romi Kuntsman
I noticed that toJavaRDD causes a computation on the DataFrame, so is it
considered an action, even though logically it's a transformation?
On Nov 4, 2015 6:51 PM, "Aliaksei Tsyvunchyk" 
wrote:

> Hello folks,
>
> Recently I have noticed unexpectedly big network traffic between Driver
> Program and Worker node.
> During debugging I have figured out that it is caused by following block
> of code
>
> —— Java ——— —
> DataFrame etpvRecords = context.sql(" SOME SQL query here");
> Mapper m = new Mapper(localValue, ProgramId::toProgId);
> return etpvRecords
> .toJavaRDD()
> .map(m::mapHutPutViewingRow)
> .reduce(Reducer::reduce);
> —— Java 
>
> I’m using debug breakpoint and OS X nettop to monitor traffic between
> processes. So before approaching line toJavaRDD() I have 500Kb of traffic
> and after executing this line I have 2.2 Mb of traffic. But when I check
> size of result of reduce function it is 10 Kb.
> So .toJavaRDD() seems causing worker process return dataset to driver
> process and seems further map/reduce occurs on Driver.
>
> This is definitely not expected by me, so I have 2 questions.
> 1.  Is it really expected behavior that DataFrame.toJavaRDD cause whole
> dataset return to driver or I’m doing something wrong?
> 2.  What is expected way to perform transformation with DataFrame using
> custom Java map\reduce functions in case if standard SQL features are not
> fit all my needs?
>
> Env: Spark 1.5.1 in standalone mode, (1 master, 1 worker sharing same
> machine). Java 1.8.0_60.
>
> CONFIDENTIALITY NOTICE: This email and files attached to it are
> confidential. If you are not the intended recipient you are hereby notified
> that using, copying, distributing or taking any action in reliance on the
> contents of this information is strictly prohibited. If you have received
> this email in error please notify the sender and delete this email.
>


Re: DataFrame.toJavaRDD cause fetching data to driver, is it expected ?

2015-11-04 Thread Aliaksei Tsyvunchyk
Hello Romi,

Do you mean that in my particular case I’m causing computation on dataFrame or 
it is regular behavior of DataFrame.toJavaRDD ? 
If it’s regular behavior, do you know which approach could be used to perform 
make/reduce on dataFrame without causing it to load all data to driver program ?

> On Nov 4, 2015, at 12:34 PM, Romi Kuntsman  wrote:
> 
> I noticed that toJavaRDD causes a computation on the DataFrame, so is it 
> considered an action, even though logically it's a transformation?
> 
> On Nov 4, 2015 6:51 PM, "Aliaksei Tsyvunchyk"  > wrote:
> Hello folks,
> 
> Recently I have noticed unexpectedly big network traffic between Driver 
> Program and Worker node.
> During debugging I have figured out that it is caused by following block of 
> code 
> 
> —— Java ——— — 
> DataFrame etpvRecords = context.sql(" SOME SQL query here");
> Mapper m = new Mapper(localValue, ProgramId::toProgId);
> return etpvRecords
> .toJavaRDD()
> .map(m::mapHutPutViewingRow)
> .reduce(Reducer::reduce);
> —— Java  
> 
> I’m using debug breakpoint and OS X nettop to monitor traffic between 
> processes. So before approaching line toJavaRDD() I have 500Kb of traffic and 
> after executing this line I have 2.2 Mb of traffic. But when I check size of 
> result of reduce function it is 10 Kb.
> So .toJavaRDD() seems causing worker process return dataset to driver process 
> and seems further map/reduce occurs on Driver. 
> 
> This is definitely not expected by me, so I have 2 questions.
> 1.  Is it really expected behavior that DataFrame.toJavaRDD cause whole 
> dataset return to driver or I’m doing something wrong?
> 2.  What is expected way to perform transformation with DataFrame using 
> custom Java map\reduce functions in case if standard SQL features are not fit 
> all my needs?
> 
> Env: Spark 1.5.1 in standalone mode, (1 master, 1 worker sharing same 
> machine). Java 1.8.0_60.
> 
> CONFIDENTIALITY NOTICE: This email and files attached to it are confidential. 
> If you are not the intended recipient you are hereby notified that using, 
> copying, distributing or taking any action in reliance on the contents of 
> this information is strictly prohibited. If you have received this email in 
> error please notify the sender and delete this email.
> 


-- 


CONFIDENTIALITY NOTICE: This email and files attached to it are 
confidential. If you are not the intended recipient you are hereby notified 
that using, copying, distributing or taking any action in reliance on the 
contents of this information is strictly prohibited. If you have received 
this email in error please notify the sender and delete this email.


Re: DataFrame.toJavaRDD cause fetching data to driver, is it expected ?

2015-11-04 Thread Romi Kuntsman
In my program I move between RDD and DataFrame several times.
I know that the entire data of the DF doesn't go into the driver because it
wouldn't fit there.
But calling toJavaRDD does cause computation.

Check the number of partitions you have on the DF and RDD...
On Nov 4, 2015 7:54 PM, "Aliaksei Tsyvunchyk" 
wrote:

> Hello Romi,
>
> Do you mean that in my particular case I’m causing computation on
> dataFrame or it is regular behavior of DataFrame.toJavaRDD ?
> If it’s regular behavior, do you know which approach could be used to
> perform make/reduce on dataFrame without causing it to load all data to
> driver program ?
>
> On Nov 4, 2015, at 12:34 PM, Romi Kuntsman  wrote:
>
> I noticed that toJavaRDD causes a computation on the DataFrame, so is it
> considered an action, even though logically it's a transformation?
> On Nov 4, 2015 6:51 PM, "Aliaksei Tsyvunchyk" 
> wrote:
>
>> Hello folks,
>>
>> Recently I have noticed unexpectedly big network traffic between Driver
>> Program and Worker node.
>> During debugging I have figured out that it is caused by following block
>> of code
>>
>> —— Java ——— —
>> DataFrame etpvRecords = context.sql(" SOME SQL query here");
>> Mapper m = new Mapper(localValue, ProgramId::toProgId);
>> return etpvRecords
>> .toJavaRDD()
>> .map(m::mapHutPutViewingRow)
>> .reduce(Reducer::reduce);
>> —— Java 
>>
>> I’m using debug breakpoint and OS X nettop to monitor traffic between
>> processes. So before approaching line toJavaRDD() I have 500Kb of traffic
>> and after executing this line I have 2.2 Mb of traffic. But when I check
>> size of result of reduce function it is 10 Kb.
>> So .toJavaRDD() seems causing worker process return dataset to driver
>> process and seems further map/reduce occurs on Driver.
>>
>> This is definitely not expected by me, so I have 2 questions.
>> 1.  Is it really expected behavior that DataFrame.toJavaRDD cause whole
>> dataset return to driver or I’m doing something wrong?
>> 2.  What is expected way to perform transformation with DataFrame using
>> custom Java map\reduce functions in case if standard SQL features are not
>> fit all my needs?
>>
>> Env: Spark 1.5.1 in standalone mode, (1 master, 1 worker sharing same
>> machine). Java 1.8.0_60.
>>
>> CONFIDENTIALITY NOTICE: This email and files attached to it are
>> confidential. If you are not the intended recipient you are hereby notified
>> that using, copying, distributing or taking any action in reliance on the
>> contents of this information is strictly prohibited. If you have received
>> this email in error please notify the sender and delete this email.
>>
>
>
> CONFIDENTIALITY NOTICE: This email and files attached to it are
> confidential. If you are not the intended recipient you are hereby notified
> that using, copying, distributing or taking any action in reliance on the
> contents of this information is strictly prohibited. If you have received
> this email in error please notify the sender and delete this email.
>


Re: DataFrame.toJavaRDD cause fetching data to driver, is it expected ?

2015-11-04 Thread Aliaksei Tsyvunchyk
Hi Romi,

Thank for pointing me. I quite new in Spark and not sure how it can help when 
I’ll check number of partitions in DF and RDD, so if you can give me some 
explanation it would be really helpful. Link to documentation will also help.

> On Nov 4, 2015, at 1:05 PM, Romi Kuntsman  wrote:
> 
> In my program I move between RDD and DataFrame several times.
> I know that the entire data of the DF doesn't go into the driver because it 
> wouldn't fit there.
> But calling toJavaRDD does cause computation.
> 
> Check the number of partitions you have on the DF and RDD...
> 
> On Nov 4, 2015 7:54 PM, "Aliaksei Tsyvunchyk"  > wrote:
> Hello Romi,
> 
> Do you mean that in my particular case I’m causing computation on dataFrame 
> or it is regular behavior of DataFrame.toJavaRDD ? 
> If it’s regular behavior, do you know which approach could be used to perform 
> make/reduce on dataFrame without causing it to load all data to driver 
> program ?
> 
>> On Nov 4, 2015, at 12:34 PM, Romi Kuntsman > > wrote:
>> 
>> I noticed that toJavaRDD causes a computation on the DataFrame, so is it 
>> considered an action, even though logically it's a transformation?
>> 
>> On Nov 4, 2015 6:51 PM, "Aliaksei Tsyvunchyk" > > wrote:
>> Hello folks,
>> 
>> Recently I have noticed unexpectedly big network traffic between Driver 
>> Program and Worker node.
>> During debugging I have figured out that it is caused by following block of 
>> code 
>> 
>> —— Java ——— — 
>> DataFrame etpvRecords = context.sql(" SOME SQL query here");
>> Mapper m = new Mapper(localValue, ProgramId::toProgId);
>> return etpvRecords
>> .toJavaRDD()
>> .map(m::mapHutPutViewingRow)
>> .reduce(Reducer::reduce);
>> —— Java  
>> 
>> I’m using debug breakpoint and OS X nettop to monitor traffic between 
>> processes. So before approaching line toJavaRDD() I have 500Kb of traffic 
>> and after executing this line I have 2.2 Mb of traffic. But when I check 
>> size of result of reduce function it is 10 Kb.
>> So .toJavaRDD() seems causing worker process return dataset to driver 
>> process and seems further map/reduce occurs on Driver. 
>> 
>> This is definitely not expected by me, so I have 2 questions.
>> 1.  Is it really expected behavior that DataFrame.toJavaRDD cause whole 
>> dataset return to driver or I’m doing something wrong?
>> 2.  What is expected way to perform transformation with DataFrame using 
>> custom Java map\reduce functions in case if standard SQL features are not 
>> fit all my needs?
>> 
>> Env: Spark 1.5.1 in standalone mode, (1 master, 1 worker sharing same 
>> machine). Java 1.8.0_60.
>> 
>> CONFIDENTIALITY NOTICE: This email and files attached to it are 
>> confidential. If you are not the intended recipient you are hereby notified 
>> that using, copying, distributing or taking any action in reliance on the 
>> contents of this information is strictly prohibited. If you have received 
>> this email in error please notify the sender and delete this email.
>> 
> 
> 
> CONFIDENTIALITY NOTICE: This email and files attached to it are confidential. 
> If you are not the intended recipient you are hereby notified that using, 
> copying, distributing or taking any action in reliance on the contents of 
> this information is strictly prohibited. If you have received this email in 
> error please notify the sender and delete this email.
> 


-- 


CONFIDENTIALITY NOTICE: This email and files attached to it are 
confidential. If you are not the intended recipient you are hereby notified 
that using, copying, distributing or taking any action in reliance on the 
contents of this information is strictly prohibited. If you have received 
this email in error please notify the sender and delete this email.