Re: map vs mapPartitions

2015-06-25 Thread Corey Nolet
Also,

I've noticed that .map() actually creates a MapPartitionsRDD under the
hood. SO I think the real difference is just in the API that's being
exposed. You can do a map() and not have to think about the partitions at
all or you can do a .mapPartitions() and be able to do things like chunking
of the data in the partition (fetching more than 1 record @ a time).

On Thu, Jun 25, 2015 at 12:19 PM, Corey Nolet  wrote:

> I don't know exactly what's going on under the hood but I would not assume
> that just because a whole partition is not being pulled into memory @ one
> time that that means each record is being pulled at 1 time. That's the
> beauty of exposing Iterators & Iterables in an API rather than collections-
> there's a bunch of buffering that can be hidden from the user to make the
> iterations as efficient as they can be.
>
> On Thu, Jun 25, 2015 at 11:36 AM, Shushant Arora <
> shushantaror...@gmail.com> wrote:
>
>> yes, 1 partition per core and  mapPartitions apply function on each
>> partition.
>>
>> Question is Does complete partition loads in memory so that function can
>> be applied to it or its an iterator and iterator.next() loads next record
>> and if yes then how is it efficient than map which also works on 1 record
>> at a time.
>>
>>
>> Is the only difference is -- only while loop as in below runs per record
>> as in map . But code above that will be run once per partition.
>>
>>
>> public Iterable call(Iterator input)
>> throws Exception {
>> List output = new ArrayList();
>> while(input.hasNext()){
>> output.add(input.next().length());
>>  }
>>
>>
>> so if I don't have any heavy code above while loop, performance will be
>> same as of map function.
>>
>>
>>
>> On Thu, Jun 25, 2015 at 6:51 PM, Hao Ren  wrote:
>>
>>> It's not the number of executors that matters, but the # of the CPU
>>> cores of your cluster.
>>>
>>> Each partition will be loaded on a core for computing.
>>>
>>> e.g. A cluster of 3 nodes has 24 cores, and you divide the RDD in 24
>>> partitions (24 tasks for narrow dependency).
>>> Then all the 24 partitions will be loaded to your cluster in parallel,
>>> one on each core.
>>> You may notice that some tasks will finish more quickly than others. So
>>> divide the RDD into (2~3) x (# of cores) for better pipeline performance.
>>> Say we have 72 partitions in your RDD, then initially 24 tasks run on 24
>>> cores, then first done first served until all 72 tasks are processed.
>>>
>>> Back to your origin question, map and mapPartitions are both
>>> transformation, but on different granularity.
>>> map => apply the function on each record in each partition.
>>> mapPartitions => apply the function on each partition.
>>> But the rule is the same, one partition per core.
>>>
>>> Hope it helps.
>>> Hao
>>>
>>>
>>>
>>>
>>> On Thu, Jun 25, 2015 at 1:28 PM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
 say source is HDFS,And file is divided in 10 partitions. so what will
 be  input contains.

 public Iterable call(Iterator input)

 say I have 10 executors in job each having single partition.

 will it have some part of partition or complete. And if some when I
 call input.next() - it will fetch rest or how is it handled ?





 On Thu, Jun 25, 2015 at 3:11 PM, Sean Owen  wrote:

> No, or at least, it depends on how the source of the partitions was
> implemented.
>
> On Thu, Jun 25, 2015 at 12:16 PM, Shushant Arora
>  wrote:
> > Does mapPartitions keep complete partitions in memory of executor as
> > iterable.
> >
> > JavaRDD rdd = jsc.textFile("path");
> > JavaRDD output = rdd.mapPartitions(new
> > FlatMapFunction, Integer>() {
> >
> > public Iterable call(Iterator input)
> > throws Exception {
> > List output = new ArrayList();
> > while(input.hasNext()){
> > output.add(input.next().length());
> > }
> > return output;
> > }
> >
> > });
> >
> >
> > Here does input is present in memory and can contain complete
> partition of
> > gbs ?
> > Will this function call(Iterator input) is called only for
> no of
> > partitions(say if I have 10 in this example) times. Not no of lines
> > times(say 1000) .
> >
> >
> > And whats the use of mapPartitionsWithIndex ?
> >
> > Thanks
> >
>


>>>
>>>
>>> --
>>> Hao Ren
>>>
>>> Data Engineer @ leboncoin
>>>
>>> Paris, France
>>>
>>
>>
>


Re: map vs mapPartitions

2015-06-25 Thread Corey Nolet
I don't know exactly what's going on under the hood but I would not assume
that just because a whole partition is not being pulled into memory @ one
time that that means each record is being pulled at 1 time. That's the
beauty of exposing Iterators & Iterables in an API rather than collections-
there's a bunch of buffering that can be hidden from the user to make the
iterations as efficient as they can be.

On Thu, Jun 25, 2015 at 11:36 AM, Shushant Arora 
wrote:

> yes, 1 partition per core and  mapPartitions apply function on each
> partition.
>
> Question is Does complete partition loads in memory so that function can
> be applied to it or its an iterator and iterator.next() loads next record
> and if yes then how is it efficient than map which also works on 1 record
> at a time.
>
>
> Is the only difference is -- only while loop as in below runs per record
> as in map . But code above that will be run once per partition.
>
>
> public Iterable call(Iterator input)
> throws Exception {
> List output = new ArrayList();
> while(input.hasNext()){
> output.add(input.next().length());
>  }
>
>
> so if I don't have any heavy code above while loop, performance will be
> same as of map function.
>
>
>
> On Thu, Jun 25, 2015 at 6:51 PM, Hao Ren  wrote:
>
>> It's not the number of executors that matters, but the # of the CPU cores
>> of your cluster.
>>
>> Each partition will be loaded on a core for computing.
>>
>> e.g. A cluster of 3 nodes has 24 cores, and you divide the RDD in 24
>> partitions (24 tasks for narrow dependency).
>> Then all the 24 partitions will be loaded to your cluster in parallel,
>> one on each core.
>> You may notice that some tasks will finish more quickly than others. So
>> divide the RDD into (2~3) x (# of cores) for better pipeline performance.
>> Say we have 72 partitions in your RDD, then initially 24 tasks run on 24
>> cores, then first done first served until all 72 tasks are processed.
>>
>> Back to your origin question, map and mapPartitions are both
>> transformation, but on different granularity.
>> map => apply the function on each record in each partition.
>> mapPartitions => apply the function on each partition.
>> But the rule is the same, one partition per core.
>>
>> Hope it helps.
>> Hao
>>
>>
>>
>>
>> On Thu, Jun 25, 2015 at 1:28 PM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> say source is HDFS,And file is divided in 10 partitions. so what will be
>>>  input contains.
>>>
>>> public Iterable call(Iterator input)
>>>
>>> say I have 10 executors in job each having single partition.
>>>
>>> will it have some part of partition or complete. And if some when I call
>>> input.next() - it will fetch rest or how is it handled ?
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Jun 25, 2015 at 3:11 PM, Sean Owen  wrote:
>>>
 No, or at least, it depends on how the source of the partitions was
 implemented.

 On Thu, Jun 25, 2015 at 12:16 PM, Shushant Arora
  wrote:
 > Does mapPartitions keep complete partitions in memory of executor as
 > iterable.
 >
 > JavaRDD rdd = jsc.textFile("path");
 > JavaRDD output = rdd.mapPartitions(new
 > FlatMapFunction, Integer>() {
 >
 > public Iterable call(Iterator input)
 > throws Exception {
 > List output = new ArrayList();
 > while(input.hasNext()){
 > output.add(input.next().length());
 > }
 > return output;
 > }
 >
 > });
 >
 >
 > Here does input is present in memory and can contain complete
 partition of
 > gbs ?
 > Will this function call(Iterator input) is called only for no
 of
 > partitions(say if I have 10 in this example) times. Not no of lines
 > times(say 1000) .
 >
 >
 > And whats the use of mapPartitionsWithIndex ?
 >
 > Thanks
 >

>>>
>>>
>>
>>
>> --
>> Hao Ren
>>
>> Data Engineer @ leboncoin
>>
>> Paris, France
>>
>
>


Re: map vs mapPartitions

2015-06-25 Thread Shushant Arora
yes, 1 partition per core and  mapPartitions apply function on each
partition.

Question is Does complete partition loads in memory so that function can be
applied to it or its an iterator and iterator.next() loads next record and
if yes then how is it efficient than map which also works on 1 record at a
time.


Is the only difference is -- only while loop as in below runs per record as
in map . But code above that will be run once per partition.


public Iterable call(Iterator input)
throws Exception {
List output = new ArrayList();
while(input.hasNext()){
output.add(input.next().length());
 }


so if I don't have any heavy code above while loop, performance will be
same as of map function.



On Thu, Jun 25, 2015 at 6:51 PM, Hao Ren  wrote:

> It's not the number of executors that matters, but the # of the CPU cores
> of your cluster.
>
> Each partition will be loaded on a core for computing.
>
> e.g. A cluster of 3 nodes has 24 cores, and you divide the RDD in 24
> partitions (24 tasks for narrow dependency).
> Then all the 24 partitions will be loaded to your cluster in parallel, one
> on each core.
> You may notice that some tasks will finish more quickly than others. So
> divide the RDD into (2~3) x (# of cores) for better pipeline performance.
> Say we have 72 partitions in your RDD, then initially 24 tasks run on 24
> cores, then first done first served until all 72 tasks are processed.
>
> Back to your origin question, map and mapPartitions are both
> transformation, but on different granularity.
> map => apply the function on each record in each partition.
> mapPartitions => apply the function on each partition.
> But the rule is the same, one partition per core.
>
> Hope it helps.
> Hao
>
>
>
>
> On Thu, Jun 25, 2015 at 1:28 PM, Shushant Arora  > wrote:
>
>> say source is HDFS,And file is divided in 10 partitions. so what will be
>>  input contains.
>>
>> public Iterable call(Iterator input)
>>
>> say I have 10 executors in job each having single partition.
>>
>> will it have some part of partition or complete. And if some when I call
>> input.next() - it will fetch rest or how is it handled ?
>>
>>
>>
>>
>>
>> On Thu, Jun 25, 2015 at 3:11 PM, Sean Owen  wrote:
>>
>>> No, or at least, it depends on how the source of the partitions was
>>> implemented.
>>>
>>> On Thu, Jun 25, 2015 at 12:16 PM, Shushant Arora
>>>  wrote:
>>> > Does mapPartitions keep complete partitions in memory of executor as
>>> > iterable.
>>> >
>>> > JavaRDD rdd = jsc.textFile("path");
>>> > JavaRDD output = rdd.mapPartitions(new
>>> > FlatMapFunction, Integer>() {
>>> >
>>> > public Iterable call(Iterator input)
>>> > throws Exception {
>>> > List output = new ArrayList();
>>> > while(input.hasNext()){
>>> > output.add(input.next().length());
>>> > }
>>> > return output;
>>> > }
>>> >
>>> > });
>>> >
>>> >
>>> > Here does input is present in memory and can contain complete
>>> partition of
>>> > gbs ?
>>> > Will this function call(Iterator input) is called only for no
>>> of
>>> > partitions(say if I have 10 in this example) times. Not no of lines
>>> > times(say 1000) .
>>> >
>>> >
>>> > And whats the use of mapPartitionsWithIndex ?
>>> >
>>> > Thanks
>>> >
>>>
>>
>>
>
>
> --
> Hao Ren
>
> Data Engineer @ leboncoin
>
> Paris, France
>


Re: map vs mapPartitions

2015-06-25 Thread Hao Ren
It's not the number of executors that matters, but the # of the CPU cores
of your cluster.

Each partition will be loaded on a core for computing.

e.g. A cluster of 3 nodes has 24 cores, and you divide the RDD in 24
partitions (24 tasks for narrow dependency).
Then all the 24 partitions will be loaded to your cluster in parallel, one
on each core.
You may notice that some tasks will finish more quickly than others. So
divide the RDD into (2~3) x (# of cores) for better pipeline performance.
Say we have 72 partitions in your RDD, then initially 24 tasks run on 24
cores, then first done first served until all 72 tasks are processed.

Back to your origin question, map and mapPartitions are both
transformation, but on different granularity.
map => apply the function on each record in each partition.
mapPartitions => apply the function on each partition.
But the rule is the same, one partition per core.

Hope it helps.
Hao




On Thu, Jun 25, 2015 at 1:28 PM, Shushant Arora 
wrote:

> say source is HDFS,And file is divided in 10 partitions. so what will be
>  input contains.
>
> public Iterable call(Iterator input)
>
> say I have 10 executors in job each having single partition.
>
> will it have some part of partition or complete. And if some when I call
> input.next() - it will fetch rest or how is it handled ?
>
>
>
>
>
> On Thu, Jun 25, 2015 at 3:11 PM, Sean Owen  wrote:
>
>> No, or at least, it depends on how the source of the partitions was
>> implemented.
>>
>> On Thu, Jun 25, 2015 at 12:16 PM, Shushant Arora
>>  wrote:
>> > Does mapPartitions keep complete partitions in memory of executor as
>> > iterable.
>> >
>> > JavaRDD rdd = jsc.textFile("path");
>> > JavaRDD output = rdd.mapPartitions(new
>> > FlatMapFunction, Integer>() {
>> >
>> > public Iterable call(Iterator input)
>> > throws Exception {
>> > List output = new ArrayList();
>> > while(input.hasNext()){
>> > output.add(input.next().length());
>> > }
>> > return output;
>> > }
>> >
>> > });
>> >
>> >
>> > Here does input is present in memory and can contain complete partition
>> of
>> > gbs ?
>> > Will this function call(Iterator input) is called only for no of
>> > partitions(say if I have 10 in this example) times. Not no of lines
>> > times(say 1000) .
>> >
>> >
>> > And whats the use of mapPartitionsWithIndex ?
>> >
>> > Thanks
>> >
>>
>
>


-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France


Re: map vs mapPartitions

2015-06-25 Thread Shushant Arora
Then how performance of mapPartitions is faster than map?

On Thu, Jun 25, 2015 at 6:40 PM, Daniel Darabos <
daniel.dara...@lynxanalytics.com> wrote:

> Spark creates a RecordReader and uses next() on it when you call
> input.next(). (See
> https://github.com/apache/spark/blob/v1.4.0/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L215)
>  How
> the RecordReader works is an HDFS question, but it's safe to say there is
> no difference between using map and mapPartitions.
>
> On Thu, Jun 25, 2015 at 1:28 PM, Shushant Arora  > wrote:
>
>> say source is HDFS,And file is divided in 10 partitions. so what will be
>>  input contains.
>>
>> public Iterable call(Iterator input)
>>
>> say I have 10 executors in job each having single partition.
>>
>> will it have some part of partition or complete. And if some when I call
>> input.next() - it will fetch rest or how is it handled ?
>>
>>
>>
>>
>>
>> On Thu, Jun 25, 2015 at 3:11 PM, Sean Owen  wrote:
>>
>>> No, or at least, it depends on how the source of the partitions was
>>> implemented.
>>>
>>> On Thu, Jun 25, 2015 at 12:16 PM, Shushant Arora
>>>  wrote:
>>> > Does mapPartitions keep complete partitions in memory of executor as
>>> > iterable.
>>> >
>>> > JavaRDD rdd = jsc.textFile("path");
>>> > JavaRDD output = rdd.mapPartitions(new
>>> > FlatMapFunction, Integer>() {
>>> >
>>> > public Iterable call(Iterator input)
>>> > throws Exception {
>>> > List output = new ArrayList();
>>> > while(input.hasNext()){
>>> > output.add(input.next().length());
>>> > }
>>> > return output;
>>> > }
>>> >
>>> > });
>>> >
>>> >
>>> > Here does input is present in memory and can contain complete
>>> partition of
>>> > gbs ?
>>> > Will this function call(Iterator input) is called only for no
>>> of
>>> > partitions(say if I have 10 in this example) times. Not no of lines
>>> > times(say 1000) .
>>> >
>>> >
>>> > And whats the use of mapPartitionsWithIndex ?
>>> >
>>> > Thanks
>>> >
>>>
>>
>>
>


Re: map vs mapPartitions

2015-06-25 Thread Daniel Darabos
Spark creates a RecordReader and uses next() on it when you call
input.next(). (See
https://github.com/apache/spark/blob/v1.4.0/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L215)
How
the RecordReader works is an HDFS question, but it's safe to say there is
no difference between using map and mapPartitions.

On Thu, Jun 25, 2015 at 1:28 PM, Shushant Arora 
wrote:

> say source is HDFS,And file is divided in 10 partitions. so what will be
>  input contains.
>
> public Iterable call(Iterator input)
>
> say I have 10 executors in job each having single partition.
>
> will it have some part of partition or complete. And if some when I call
> input.next() - it will fetch rest or how is it handled ?
>
>
>
>
>
> On Thu, Jun 25, 2015 at 3:11 PM, Sean Owen  wrote:
>
>> No, or at least, it depends on how the source of the partitions was
>> implemented.
>>
>> On Thu, Jun 25, 2015 at 12:16 PM, Shushant Arora
>>  wrote:
>> > Does mapPartitions keep complete partitions in memory of executor as
>> > iterable.
>> >
>> > JavaRDD rdd = jsc.textFile("path");
>> > JavaRDD output = rdd.mapPartitions(new
>> > FlatMapFunction, Integer>() {
>> >
>> > public Iterable call(Iterator input)
>> > throws Exception {
>> > List output = new ArrayList();
>> > while(input.hasNext()){
>> > output.add(input.next().length());
>> > }
>> > return output;
>> > }
>> >
>> > });
>> >
>> >
>> > Here does input is present in memory and can contain complete partition
>> of
>> > gbs ?
>> > Will this function call(Iterator input) is called only for no of
>> > partitions(say if I have 10 in this example) times. Not no of lines
>> > times(say 1000) .
>> >
>> >
>> > And whats the use of mapPartitionsWithIndex ?
>> >
>> > Thanks
>> >
>>
>
>


Re: map vs mapPartitions

2015-06-25 Thread Shushant Arora
say source is HDFS,And file is divided in 10 partitions. so what will be
 input contains.

public Iterable call(Iterator input)

say I have 10 executors in job each having single partition.

will it have some part of partition or complete. And if some when I call
input.next() - it will fetch rest or how is it handled ?





On Thu, Jun 25, 2015 at 3:11 PM, Sean Owen  wrote:

> No, or at least, it depends on how the source of the partitions was
> implemented.
>
> On Thu, Jun 25, 2015 at 12:16 PM, Shushant Arora
>  wrote:
> > Does mapPartitions keep complete partitions in memory of executor as
> > iterable.
> >
> > JavaRDD rdd = jsc.textFile("path");
> > JavaRDD output = rdd.mapPartitions(new
> > FlatMapFunction, Integer>() {
> >
> > public Iterable call(Iterator input)
> > throws Exception {
> > List output = new ArrayList();
> > while(input.hasNext()){
> > output.add(input.next().length());
> > }
> > return output;
> > }
> >
> > });
> >
> >
> > Here does input is present in memory and can contain complete partition
> of
> > gbs ?
> > Will this function call(Iterator input) is called only for no of
> > partitions(say if I have 10 in this example) times. Not no of lines
> > times(say 1000) .
> >
> >
> > And whats the use of mapPartitionsWithIndex ?
> >
> > Thanks
> >
>


Re: map vs mapPartitions

2015-06-25 Thread Sean Owen
No, or at least, it depends on how the source of the partitions was implemented.

On Thu, Jun 25, 2015 at 12:16 PM, Shushant Arora
 wrote:
> Does mapPartitions keep complete partitions in memory of executor as
> iterable.
>
> JavaRDD rdd = jsc.textFile("path");
> JavaRDD output = rdd.mapPartitions(new
> FlatMapFunction, Integer>() {
>
> public Iterable call(Iterator input)
> throws Exception {
> List output = new ArrayList();
> while(input.hasNext()){
> output.add(input.next().length());
> }
> return output;
> }
>
> });
>
>
> Here does input is present in memory and can contain complete partition of
> gbs ?
> Will this function call(Iterator input) is called only for no of
> partitions(say if I have 10 in this example) times. Not no of lines
> times(say 1000) .
>
>
> And whats the use of mapPartitionsWithIndex ?
>
> Thanks
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org