Re: Re: repartitionAndSortWithinPartitions task shuffle phase is very slow

2015-10-30 Thread Luke Han
Would love to have any suggestion or comments about our implementation.

Is there anyone who has such experience?

Thanks.


Best Regards!
-

Luke Han

On Tue, Oct 27, 2015 at 10:33 AM, 周千昊 <z.qian...@gmail.com> wrote:

> I have replace default java serialization with Kyro.
> It indeed reduce the shuffle size and the performance has been improved,
> however the shuffle speed remains unchanged.
> I am quite newbie to Spark, does anyone have idea about towards which
> direction I should go to find the root cause?
>
> 周千昊 <qhz...@apache.org>于2015年10月23日周五 下午5:50写道:
>
> > We have not tried that yet, however both implementations on MR and spark
> > are tested on the same amount of partition and same cluster
> >
> > 250635...@qq.com <250635...@qq.com>于2015年10月23日周五 下午5:21写道:
> >
> >> Hi,
> >>
> >> Not an expert on this kind of implementation. But referring to the
> >> performance result,
> >>
> >> if the mapside partitions fittable according to the different datasets?
> >> Have you tried to
> >>
> >> increase the count of partitions?
> >>
> >>
> >>
> >>
> >>
> >> 250635...@qq.com
> >>
> >> From: Li Yang
> >> Date: 2015-10-23 16:17
> >> To: dev
> >> CC: Reynold Xin; dev@spark.apache.org
> >> Subject: Re: repartitionAndSortWithinPartitions task shuffle phase is
> >> very slow
> >> Any advise on how to tune the repartitionAndSortWithinPartitions stage?
> >> Any particular metrics or parameter to look into? Basically Spark and MR
> >> shuffles the same amount of data, cause we kinda copied MR
> implementation
> >> into Spark.
> >>
> >> Let us know if more info is needed.
> >>
> >> On Fri, Oct 23, 2015 at 10:24 AM, 周千昊 <qhz...@apache.org> wrote:
> >>
> >> > +kylin dev list
> >> >
> >> > 周千昊 <qhz...@apache.org>于2015年10月23日周五 上午10:20写道:
> >> >
> >> > > Hi, Reynold
> >> > >   Using glom() is because it is easy to adapt to calculation
> logic
> >> > > already implemented in MR. And o be clear, we are still in POC.
> >> > >   Since the results shows there is almost no difference between
> >> this
> >> > > glom stage and the MR mapper, using glom here might not be the
> issue.
> >> > >   I was trying to monitor the network traffic when repartition
> >> > > happens, and it showed that the traffic peek is about 200 - 300MB/s
> >> while
> >> > > it stayed at speed of about 3-4MB/s for a long time. Have you guys
> got
> >> > any
> >> > > idea about it?
> >> > >
> >> > > Reynold Xin <r...@databricks.com>于2015年10月23日周五 上午2:43写道:
> >> > >
> >> > >> Why do you do a glom? It seems unnecessarily expensive to
> materialize
> >> > >> each partition in memory.
> >> > >>
> >> > >>
> >> > >> On Thu, Oct 22, 2015 at 2:02 AM, 周千昊 <qhz...@apache.org> wrote:
> >> > >>
> >> > >>> Hi, spark community
> >> > >>>   I have an application which I try to migrate from MR to
> Spark.
> >> > >>>   It will do some calculations from Hive and output to hfile
> >> which
> >> > >>> will be bulk load to HBase Table, details as follow:
> >> > >>>
> >> > >>>  Rdd input = getSourceInputFromHive()
> >> > >>>  Rdd<Tuple2<byte[], byte[]>> mapSideResult =
> >> > >>> input.glom().mapPartitions(/*some calculation, equivalent to MR
> >> mapper
> >> > >>> */)
> >> > >>>  // PS: the result in each partition has already been sorted
> >> > >>> according to the lexicographical order during the calculation
> >> > >>>  mapSideResult.repartitionAndSortWithPartitions(/*partition
> with
> >> > >>> byte[][] which is HTable split key, equivalent to MR shuffle
> >> > */).map(/*transform
> >> > >>> Tuple2<byte[], byte[]> to Tuple2<ImmutableBytesWritable,
> >> > KeyValue>/*equivalent
> >> > >>> to MR reducer without output*/).saveAsNewAPIHadoopFile(/*write to
> >> > >>> hfile*/)
> >> > >>>
> >> > >>>   This all works fine on a small dataset, and spark outruns MR
> >> by
> >> > >>> about 10%. However when I apply it on a dataset of 150 million
> >> > records, MR
> >> > >>> is about 100% faster than spark.(*MR 25min spark 50min*)
> >> > >>>After exploring into the application UI, it shows that in
> the
> >> > >>> repartitionAndSortWithinPartitions stage is very slow, and in the
> >> > shuffle
> >> > >>> phase a 6GB size shuffle cost about 18min which is quite
> >> unreasonable
> >> > >>>*Can anyone help with this issue and give me some advice on
> >> > >>> this? **It’s not iterative processing, however I believe Spark
> >> could be
> >> > >>> the same fast at minimal.*
> >> > >>>
> >> > >>>   Here are the cluster info:
> >> > >>>   vm: 8 nodes * (128G mem + 64 core)
> >> > >>>   hadoop cluster: hdp 2.2.6
> >> > >>>   spark running mode: yarn-client
> >> > >>>   spark version: 1.5.1
> >> > >>>
> >> > >>>
> >> > >>
> >> >
> >>
> > --
> Best Regard
> ZhouQianhao
>


Re: Re: repartitionAndSortWithinPartitions task shuffle phase is very slow

2015-10-26 Thread 周千昊
I have replace default java serialization with Kyro.
It indeed reduce the shuffle size and the performance has been improved,
however the shuffle speed remains unchanged.
I am quite newbie to Spark, does anyone have idea about towards which
direction I should go to find the root cause?

周千昊 <qhz...@apache.org>于2015年10月23日周五 下午5:50写道:

> We have not tried that yet, however both implementations on MR and spark
> are tested on the same amount of partition and same cluster
>
> 250635...@qq.com <250635...@qq.com>于2015年10月23日周五 下午5:21写道:
>
>> Hi,
>>
>> Not an expert on this kind of implementation. But referring to the
>> performance result,
>>
>> if the mapside partitions fittable according to the different datasets?
>> Have you tried to
>>
>> increase the count of partitions?
>>
>>
>>
>>
>>
>> 250635...@qq.com
>>
>> From: Li Yang
>> Date: 2015-10-23 16:17
>> To: dev
>> CC: Reynold Xin; dev@spark.apache.org
>> Subject: Re: repartitionAndSortWithinPartitions task shuffle phase is
>> very slow
>> Any advise on how to tune the repartitionAndSortWithinPartitions stage?
>> Any particular metrics or parameter to look into? Basically Spark and MR
>> shuffles the same amount of data, cause we kinda copied MR implementation
>> into Spark.
>>
>> Let us know if more info is needed.
>>
>> On Fri, Oct 23, 2015 at 10:24 AM, 周千昊 <qhz...@apache.org> wrote:
>>
>> > +kylin dev list
>> >
>> > 周千昊 <qhz...@apache.org>于2015年10月23日周五 上午10:20写道:
>> >
>> > > Hi, Reynold
>> > >   Using glom() is because it is easy to adapt to calculation logic
>> > > already implemented in MR. And o be clear, we are still in POC.
>> > >   Since the results shows there is almost no difference between
>> this
>> > > glom stage and the MR mapper, using glom here might not be the issue.
>> > >   I was trying to monitor the network traffic when repartition
>> > > happens, and it showed that the traffic peek is about 200 - 300MB/s
>> while
>> > > it stayed at speed of about 3-4MB/s for a long time. Have you guys got
>> > any
>> > > idea about it?
>> > >
>> > > Reynold Xin <r...@databricks.com>于2015年10月23日周五 上午2:43写道:
>> > >
>> > >> Why do you do a glom? It seems unnecessarily expensive to materialize
>> > >> each partition in memory.
>> > >>
>> > >>
>> > >> On Thu, Oct 22, 2015 at 2:02 AM, 周千昊 <qhz...@apache.org> wrote:
>> > >>
>> > >>> Hi, spark community
>> > >>>   I have an application which I try to migrate from MR to Spark.
>> > >>>   It will do some calculations from Hive and output to hfile
>> which
>> > >>> will be bulk load to HBase Table, details as follow:
>> > >>>
>> > >>>  Rdd input = getSourceInputFromHive()
>> > >>>  Rdd<Tuple2<byte[], byte[]>> mapSideResult =
>> > >>> input.glom().mapPartitions(/*some calculation, equivalent to MR
>> mapper
>> > >>> */)
>> > >>>  // PS: the result in each partition has already been sorted
>> > >>> according to the lexicographical order during the calculation
>> > >>>  mapSideResult.repartitionAndSortWithPartitions(/*partition with
>> > >>> byte[][] which is HTable split key, equivalent to MR shuffle
>> > */).map(/*transform
>> > >>> Tuple2<byte[], byte[]> to Tuple2<ImmutableBytesWritable,
>> > KeyValue>/*equivalent
>> > >>> to MR reducer without output*/).saveAsNewAPIHadoopFile(/*write to
>> > >>> hfile*/)
>> > >>>
>> > >>>   This all works fine on a small dataset, and spark outruns MR
>> by
>> > >>> about 10%. However when I apply it on a dataset of 150 million
>> > records, MR
>> > >>> is about 100% faster than spark.(*MR 25min spark 50min*)
>> > >>>After exploring into the application UI, it shows that in the
>> > >>> repartitionAndSortWithinPartitions stage is very slow, and in the
>> > shuffle
>> > >>> phase a 6GB size shuffle cost about 18min which is quite
>> unreasonable
>> > >>>*Can anyone help with this issue and give me some advice on
>> > >>> this? **It’s not iterative processing, however I believe Spark
>> could be
>> > >>> the same fast at minimal.*
>> > >>>
>> > >>>   Here are the cluster info:
>> > >>>   vm: 8 nodes * (128G mem + 64 core)
>> > >>>   hadoop cluster: hdp 2.2.6
>> > >>>   spark running mode: yarn-client
>> > >>>   spark version: 1.5.1
>> > >>>
>> > >>>
>> > >>
>> >
>>
> --
Best Regard
ZhouQianhao


Re: Re: repartitionAndSortWithinPartitions task shuffle phase is very slow

2015-10-23 Thread 周千昊
We have not tried that yet, however both implementations on MR and spark
are tested on the same amount of partition and same cluster

250635...@qq.com <250635...@qq.com>于2015年10月23日周五 下午5:21写道:

> Hi,
>
> Not an expert on this kind of implementation. But referring to the
> performance result,
>
> if the mapside partitions fittable according to the different datasets?
> Have you tried to
>
> increase the count of partitions?
>
>
>
>
>
> 250635...@qq.com
>
> From: Li Yang
> Date: 2015-10-23 16:17
> To: dev
> CC: Reynold Xin; dev@spark.apache.org
> Subject: Re: repartitionAndSortWithinPartitions task shuffle phase is very
> slow
> Any advise on how to tune the repartitionAndSortWithinPartitions stage?
> Any particular metrics or parameter to look into? Basically Spark and MR
> shuffles the same amount of data, cause we kinda copied MR implementation
> into Spark.
>
> Let us know if more info is needed.
>
> On Fri, Oct 23, 2015 at 10:24 AM, 周千昊 <qhz...@apache.org> wrote:
>
> > +kylin dev list
> >
> > 周千昊 <qhz...@apache.org>于2015年10月23日周五 上午10:20写道:
> >
> > > Hi, Reynold
> > >   Using glom() is because it is easy to adapt to calculation logic
> > > already implemented in MR. And o be clear, we are still in POC.
> > >   Since the results shows there is almost no difference between
> this
> > > glom stage and the MR mapper, using glom here might not be the issue.
> > >   I was trying to monitor the network traffic when repartition
> > > happens, and it showed that the traffic peek is about 200 - 300MB/s
> while
> > > it stayed at speed of about 3-4MB/s for a long time. Have you guys got
> > any
> > > idea about it?
> > >
> > > Reynold Xin <r...@databricks.com>于2015年10月23日周五 上午2:43写道:
> > >
> > >> Why do you do a glom? It seems unnecessarily expensive to materialize
> > >> each partition in memory.
> > >>
> > >>
> > >> On Thu, Oct 22, 2015 at 2:02 AM, 周千昊 <qhz...@apache.org> wrote:
> > >>
> > >>> Hi, spark community
> > >>>   I have an application which I try to migrate from MR to Spark.
> > >>>   It will do some calculations from Hive and output to hfile
> which
> > >>> will be bulk load to HBase Table, details as follow:
> > >>>
> > >>>  Rdd input = getSourceInputFromHive()
> > >>>  Rdd<Tuple2<byte[], byte[]>> mapSideResult =
> > >>> input.glom().mapPartitions(/*some calculation, equivalent to MR
> mapper
> > >>> */)
> > >>>  // PS: the result in each partition has already been sorted
> > >>> according to the lexicographical order during the calculation
> > >>>  mapSideResult.repartitionAndSortWithPartitions(/*partition with
> > >>> byte[][] which is HTable split key, equivalent to MR shuffle
> > */).map(/*transform
> > >>> Tuple2<byte[], byte[]> to Tuple2<ImmutableBytesWritable,
> > KeyValue>/*equivalent
> > >>> to MR reducer without output*/).saveAsNewAPIHadoopFile(/*write to
> > >>> hfile*/)
> > >>>
> > >>>   This all works fine on a small dataset, and spark outruns MR by
> > >>> about 10%. However when I apply it on a dataset of 150 million
> > records, MR
> > >>> is about 100% faster than spark.(*MR 25min spark 50min*)
> > >>>After exploring into the application UI, it shows that in the
> > >>> repartitionAndSortWithinPartitions stage is very slow, and in the
> > shuffle
> > >>> phase a 6GB size shuffle cost about 18min which is quite unreasonable
> > >>>*Can anyone help with this issue and give me some advice on
> > >>> this? **It’s not iterative processing, however I believe Spark could
> be
> > >>> the same fast at minimal.*
> > >>>
> > >>>   Here are the cluster info:
> > >>>   vm: 8 nodes * (128G mem + 64 core)
> > >>>   hadoop cluster: hdp 2.2.6
> > >>>   spark running mode: yarn-client
> > >>>   spark version: 1.5.1
> > >>>
> > >>>
> > >>
> >
>


Re: repartitionAndSortWithinPartitions task shuffle phase is very slow

2015-10-23 Thread Li Yang
Any advise on how to tune the repartitionAndSortWithinPartitions stage?
Any particular metrics or parameter to look into? Basically Spark and MR
shuffles the same amount of data, cause we kinda copied MR implementation
into Spark.

Let us know if more info is needed.

On Fri, Oct 23, 2015 at 10:24 AM, 周千昊  wrote:

> +kylin dev list
>
> 周千昊 于2015年10月23日周五 上午10:20写道:
>
> > Hi, Reynold
> >   Using glom() is because it is easy to adapt to calculation logic
> > already implemented in MR. And o be clear, we are still in POC.
> >   Since the results shows there is almost no difference between this
> > glom stage and the MR mapper, using glom here might not be the issue.
> >   I was trying to monitor the network traffic when repartition
> > happens, and it showed that the traffic peek is about 200 - 300MB/s while
> > it stayed at speed of about 3-4MB/s for a long time. Have you guys got
> any
> > idea about it?
> >
> > Reynold Xin 于2015年10月23日周五 上午2:43写道:
> >
> >> Why do you do a glom? It seems unnecessarily expensive to materialize
> >> each partition in memory.
> >>
> >>
> >> On Thu, Oct 22, 2015 at 2:02 AM, 周千昊  wrote:
> >>
> >>> Hi, spark community
> >>>   I have an application which I try to migrate from MR to Spark.
> >>>   It will do some calculations from Hive and output to hfile which
> >>> will be bulk load to HBase Table, details as follow:
> >>>
> >>>  Rdd input = getSourceInputFromHive()
> >>>  Rdd> mapSideResult =
> >>> input.glom().mapPartitions(/*some calculation, equivalent to MR mapper
> >>> */)
> >>>  // PS: the result in each partition has already been sorted
> >>> according to the lexicographical order during the calculation
> >>>  mapSideResult.repartitionAndSortWithPartitions(/*partition with
> >>> byte[][] which is HTable split key, equivalent to MR shuffle
> */).map(/*transform
> >>> Tuple2 to Tuple2 KeyValue>/*equivalent
> >>> to MR reducer without output*/).saveAsNewAPIHadoopFile(/*write to
> >>> hfile*/)
> >>>
> >>>   This all works fine on a small dataset, and spark outruns MR by
> >>> about 10%. However when I apply it on a dataset of 150 million
> records, MR
> >>> is about 100% faster than spark.(*MR 25min spark 50min*)
> >>>After exploring into the application UI, it shows that in the
> >>> repartitionAndSortWithinPartitions stage is very slow, and in the
> shuffle
> >>> phase a 6GB size shuffle cost about 18min which is quite unreasonable
> >>>*Can anyone help with this issue and give me some advice on
> >>> this? **It’s not iterative processing, however I believe Spark could be
> >>> the same fast at minimal.*
> >>>
> >>>   Here are the cluster info:
> >>>   vm: 8 nodes * (128G mem + 64 core)
> >>>   hadoop cluster: hdp 2.2.6
> >>>   spark running mode: yarn-client
> >>>   spark version: 1.5.1
> >>>
> >>>
> >>
>


Re: repartitionAndSortWithinPartitions task shuffle phase is very slow

2015-10-22 Thread Reynold Xin
Why do you do a glom? It seems unnecessarily expensive to materialize each
partition in memory.


On Thu, Oct 22, 2015 at 2:02 AM, 周千昊  wrote:

> Hi, spark community
>   I have an application which I try to migrate from MR to Spark.
>   It will do some calculations from Hive and output to hfile which
> will be bulk load to HBase Table, details as follow:
>
>  Rdd input = getSourceInputFromHive()
>  Rdd> mapSideResult =
> input.glom().mapPartitions(/*some calculation, equivalent to MR mapper*/)
>  // PS: the result in each partition has already been sorted according
> to the lexicographical order during the calculation
>  mapSideResult.repartitionAndSortWithPartitions(/*partition with
> byte[][] which is HTable split key, equivalent to MR shuffle  
> */).map(/*transform
> Tuple2 to Tuple2/*equivalent
> to MR reducer without output*/).saveAsNewAPIHadoopFile(/*write to hfile*/)
>
>   This all works fine on a small dataset, and spark outruns MR by
> about 10%. However when I apply it on a dataset of 150 million records, MR
> is about 100% faster than spark.(*MR 25min spark 50min*)
>After exploring into the application UI, it shows that in the
> repartitionAndSortWithinPartitions stage is very slow, and in the shuffle
> phase a 6GB size shuffle cost about 18min which is quite unreasonable
>*Can anyone help with this issue and give me some advice on this? 
> **It’s
> not iterative processing, however I believe Spark could be the same fast at
> minimal.*
>
>   Here are the cluster info:
>   vm: 8 nodes * (128G mem + 64 core)
>   hadoop cluster: hdp 2.2.6
>   spark running mode: yarn-client
>   spark version: 1.5.1
>
>


Re: repartitionAndSortWithinPartitions task shuffle phase is very slow

2015-10-22 Thread 周千昊
+kylin dev list

周千昊 于2015年10月23日周五 上午10:20写道:

> Hi, Reynold
>   Using glom() is because it is easy to adapt to calculation logic
> already implemented in MR. And o be clear, we are still in POC.
>   Since the results shows there is almost no difference between this
> glom stage and the MR mapper, using glom here might not be the issue.
>   I was trying to monitor the network traffic when repartition
> happens, and it showed that the traffic peek is about 200 - 300MB/s while
> it stayed at speed of about 3-4MB/s for a long time. Have you guys got any
> idea about it?
>
> Reynold Xin 于2015年10月23日周五 上午2:43写道:
>
>> Why do you do a glom? It seems unnecessarily expensive to materialize
>> each partition in memory.
>>
>>
>> On Thu, Oct 22, 2015 at 2:02 AM, 周千昊  wrote:
>>
>>> Hi, spark community
>>>   I have an application which I try to migrate from MR to Spark.
>>>   It will do some calculations from Hive and output to hfile which
>>> will be bulk load to HBase Table, details as follow:
>>>
>>>  Rdd input = getSourceInputFromHive()
>>>  Rdd> mapSideResult =
>>> input.glom().mapPartitions(/*some calculation, equivalent to MR mapper
>>> */)
>>>  // PS: the result in each partition has already been sorted
>>> according to the lexicographical order during the calculation
>>>  mapSideResult.repartitionAndSortWithPartitions(/*partition with
>>> byte[][] which is HTable split key, equivalent to MR shuffle  
>>> */).map(/*transform
>>> Tuple2 to Tuple2>> KeyValue>/*equivalent
>>> to MR reducer without output*/).saveAsNewAPIHadoopFile(/*write to
>>> hfile*/)
>>>
>>>   This all works fine on a small dataset, and spark outruns MR by
>>> about 10%. However when I apply it on a dataset of 150 million records, MR
>>> is about 100% faster than spark.(*MR 25min spark 50min*)
>>>After exploring into the application UI, it shows that in the
>>> repartitionAndSortWithinPartitions stage is very slow, and in the shuffle
>>> phase a 6GB size shuffle cost about 18min which is quite unreasonable
>>>*Can anyone help with this issue and give me some advice on
>>> this? **It’s not iterative processing, however I believe Spark could be
>>> the same fast at minimal.*
>>>
>>>   Here are the cluster info:
>>>   vm: 8 nodes * (128G mem + 64 core)
>>>   hadoop cluster: hdp 2.2.6
>>>   spark running mode: yarn-client
>>>   spark version: 1.5.1
>>>
>>>
>>


Re: repartitionAndSortWithinPartitions task shuffle phase is very slow

2015-10-22 Thread 周千昊
Hi, Reynold
  Using glom() is because it is easy to adapt to calculation logic
already implemented in MR. And o be clear, we are still in POC.
  Since the results shows there is almost no difference between this
glom stage and the MR mapper, using glom here might not be the issue.
  I was trying to monitor the network traffic when repartition happens,
and it showed that the traffic peek is about 200 - 300MB/s while it stayed
at speed of about 3-4MB/s for a long time. Have you guys got any idea about
it?

Reynold Xin 于2015年10月23日周五 上午2:43写道:

> Why do you do a glom? It seems unnecessarily expensive to materialize each
> partition in memory.
>
>
> On Thu, Oct 22, 2015 at 2:02 AM, 周千昊  wrote:
>
>> Hi, spark community
>>   I have an application which I try to migrate from MR to Spark.
>>   It will do some calculations from Hive and output to hfile which
>> will be bulk load to HBase Table, details as follow:
>>
>>  Rdd input = getSourceInputFromHive()
>>  Rdd> mapSideResult =
>> input.glom().mapPartitions(/*some calculation, equivalent to MR mapper*/)
>>  // PS: the result in each partition has already been sorted
>> according to the lexicographical order during the calculation
>>  mapSideResult.repartitionAndSortWithPartitions(/*partition with
>> byte[][] which is HTable split key, equivalent to MR shuffle  
>> */).map(/*transform
>> Tuple2 to Tuple2> KeyValue>/*equivalent
>> to MR reducer without output*/).saveAsNewAPIHadoopFile(/*write to
>> hfile*/)
>>
>>   This all works fine on a small dataset, and spark outruns MR by
>> about 10%. However when I apply it on a dataset of 150 million records, MR
>> is about 100% faster than spark.(*MR 25min spark 50min*)
>>After exploring into the application UI, it shows that in the
>> repartitionAndSortWithinPartitions stage is very slow, and in the shuffle
>> phase a 6GB size shuffle cost about 18min which is quite unreasonable
>>*Can anyone help with this issue and give me some advice on
>> this? **It’s not iterative processing, however I believe Spark could be
>> the same fast at minimal.*
>>
>>   Here are the cluster info:
>>   vm: 8 nodes * (128G mem + 64 core)
>>   hadoop cluster: hdp 2.2.6
>>   spark running mode: yarn-client
>>   spark version: 1.5.1
>>
>>
>