Re: Re: repartitionAndSortWithinPartitions task shuffle phase is very slow

2015-10-30 Thread Luke Han
Would love to have any suggestion or comments about our implementation.

Is there anyone who has such experience?

Thanks.


Best Regards!
-

Luke Han

On Tue, Oct 27, 2015 at 10:33 AM, 周千昊 <z.qian...@gmail.com> wrote:

> I have replace default java serialization with Kyro.
> It indeed reduce the shuffle size and the performance has been improved,
> however the shuffle speed remains unchanged.
> I am quite newbie to Spark, does anyone have idea about towards which
> direction I should go to find the root cause?
>
> 周千昊 <qhz...@apache.org>于2015年10月23日周五 下午5:50写道:
>
> > We have not tried that yet, however both implementations on MR and spark
> > are tested on the same amount of partition and same cluster
> >
> > 250635...@qq.com <250635...@qq.com>于2015年10月23日周五 下午5:21写道:
> >
> >> Hi,
> >>
> >> Not an expert on this kind of implementation. But referring to the
> >> performance result,
> >>
> >> if the mapside partitions fittable according to the different datasets?
> >> Have you tried to
> >>
> >> increase the count of partitions?
> >>
> >>
> >>
> >>
> >>
> >> 250635...@qq.com
> >>
> >> From: Li Yang
> >> Date: 2015-10-23 16:17
> >> To: dev
> >> CC: Reynold Xin; dev@spark.apache.org
> >> Subject: Re: repartitionAndSortWithinPartitions task shuffle phase is
> >> very slow
> >> Any advise on how to tune the repartitionAndSortWithinPartitions stage?
> >> Any particular metrics or parameter to look into? Basically Spark and MR
> >> shuffles the same amount of data, cause we kinda copied MR
> implementation
> >> into Spark.
> >>
> >> Let us know if more info is needed.
> >>
> >> On Fri, Oct 23, 2015 at 10:24 AM, 周千昊 <qhz...@apache.org> wrote:
> >>
> >> > +kylin dev list
> >> >
> >> > 周千昊 <qhz...@apache.org>于2015年10月23日周五 上午10:20写道:
> >> >
> >> > > Hi, Reynold
> >> > >   Using glom() is because it is easy to adapt to calculation
> logic
> >> > > already implemented in MR. And o be clear, we are still in POC.
> >> > >   Since the results shows there is almost no difference between
> >> this
> >> > > glom stage and the MR mapper, using glom here might not be the
> issue.
> >> > >   I was trying to monitor the network traffic when repartition
> >> > > happens, and it showed that the traffic peek is about 200 - 300MB/s
> >> while
> >> > > it stayed at speed of about 3-4MB/s for a long time. Have you guys
> got
> >> > any
> >> > > idea about it?
> >> > >
> >> > > Reynold Xin <r...@databricks.com>于2015年10月23日周五 上午2:43写道:
> >> > >
> >> > >> Why do you do a glom? It seems unnecessarily expensive to
> materialize
> >> > >> each partition in memory.
> >> > >>
> >> > >>
> >> > >> On Thu, Oct 22, 2015 at 2:02 AM, 周千昊 <qhz...@apache.org> wrote:
> >> > >>
> >> > >>> Hi, spark community
> >> > >>>   I have an application which I try to migrate from MR to
> Spark.
> >> > >>>   It will do some calculations from Hive and output to hfile
> >> which
> >> > >>> will be bulk load to HBase Table, details as follow:
> >> > >>>
> >> > >>>  Rdd input = getSourceInputFromHive()
> >> > >>>  Rdd<Tuple2<byte[], byte[]>> mapSideResult =
> >> > >>> input.glom().mapPartitions(/*some calculation, equivalent to MR
> >> mapper
> >> > >>> */)
> >> > >>>  // PS: the result in each partition has already been sorted
> >> > >>> according to the lexicographical order during the calculation
> >> > >>>  mapSideResult.repartitionAndSortWithPartitions(/*partition
> with
> >> > >>> byte[][] which is HTable split key, equivalent to MR shuffle
> >> > */).map(/*transform
> >> > >>> Tuple2<byte[], byte[]> to Tuple2<ImmutableBytesWritable,
> >> > KeyValue>/*equivalent
> >> > >>> to MR reducer without output*/).saveAsNewAPIHadoopFile(/*write to
> >> > >>> hfile*/)
> >> > >>>
> >> > >>>   This all works fine on a small dataset, and spark outruns MR
> >> by
> >> > >>> about 10%. However when I apply it on a dataset of 150 million
> >> > records, MR
> >> > >>> is about 100% faster than spark.(*MR 25min spark 50min*)
> >> > >>>After exploring into the application UI, it shows that in
> the
> >> > >>> repartitionAndSortWithinPartitions stage is very slow, and in the
> >> > shuffle
> >> > >>> phase a 6GB size shuffle cost about 18min which is quite
> >> unreasonable
> >> > >>>*Can anyone help with this issue and give me some advice on
> >> > >>> this? **It’s not iterative processing, however I believe Spark
> >> could be
> >> > >>> the same fast at minimal.*
> >> > >>>
> >> > >>>   Here are the cluster info:
> >> > >>>   vm: 8 nodes * (128G mem + 64 core)
> >> > >>>   hadoop cluster: hdp 2.2.6
> >> > >>>   spark running mode: yarn-client
> >> > >>>   spark version: 1.5.1
> >> > >>>
> >> > >>>
> >> > >>
> >> >
> >>
> > --
> Best Regard
> ZhouQianhao
>


Re: Re: repartitionAndSortWithinPartitions task shuffle phase is very slow

2015-10-26 Thread 周千昊
I have replace default java serialization with Kyro.
It indeed reduce the shuffle size and the performance has been improved,
however the shuffle speed remains unchanged.
I am quite newbie to Spark, does anyone have idea about towards which
direction I should go to find the root cause?

周千昊 <qhz...@apache.org>于2015年10月23日周五 下午5:50写道:

> We have not tried that yet, however both implementations on MR and spark
> are tested on the same amount of partition and same cluster
>
> 250635...@qq.com <250635...@qq.com>于2015年10月23日周五 下午5:21写道:
>
>> Hi,
>>
>> Not an expert on this kind of implementation. But referring to the
>> performance result,
>>
>> if the mapside partitions fittable according to the different datasets?
>> Have you tried to
>>
>> increase the count of partitions?
>>
>>
>>
>>
>>
>> 250635...@qq.com
>>
>> From: Li Yang
>> Date: 2015-10-23 16:17
>> To: dev
>> CC: Reynold Xin; dev@spark.apache.org
>> Subject: Re: repartitionAndSortWithinPartitions task shuffle phase is
>> very slow
>> Any advise on how to tune the repartitionAndSortWithinPartitions stage?
>> Any particular metrics or parameter to look into? Basically Spark and MR
>> shuffles the same amount of data, cause we kinda copied MR implementation
>> into Spark.
>>
>> Let us know if more info is needed.
>>
>> On Fri, Oct 23, 2015 at 10:24 AM, 周千昊 <qhz...@apache.org> wrote:
>>
>> > +kylin dev list
>> >
>> > 周千昊 <qhz...@apache.org>于2015年10月23日周五 上午10:20写道:
>> >
>> > > Hi, Reynold
>> > >   Using glom() is because it is easy to adapt to calculation logic
>> > > already implemented in MR. And o be clear, we are still in POC.
>> > >   Since the results shows there is almost no difference between
>> this
>> > > glom stage and the MR mapper, using glom here might not be the issue.
>> > >   I was trying to monitor the network traffic when repartition
>> > > happens, and it showed that the traffic peek is about 200 - 300MB/s
>> while
>> > > it stayed at speed of about 3-4MB/s for a long time. Have you guys got
>> > any
>> > > idea about it?
>> > >
>> > > Reynold Xin <r...@databricks.com>于2015年10月23日周五 上午2:43写道:
>> > >
>> > >> Why do you do a glom? It seems unnecessarily expensive to materialize
>> > >> each partition in memory.
>> > >>
>> > >>
>> > >> On Thu, Oct 22, 2015 at 2:02 AM, 周千昊 <qhz...@apache.org> wrote:
>> > >>
>> > >>> Hi, spark community
>> > >>>   I have an application which I try to migrate from MR to Spark.
>> > >>>   It will do some calculations from Hive and output to hfile
>> which
>> > >>> will be bulk load to HBase Table, details as follow:
>> > >>>
>> > >>>  Rdd input = getSourceInputFromHive()
>> > >>>  Rdd<Tuple2<byte[], byte[]>> mapSideResult =
>> > >>> input.glom().mapPartitions(/*some calculation, equivalent to MR
>> mapper
>> > >>> */)
>> > >>>  // PS: the result in each partition has already been sorted
>> > >>> according to the lexicographical order during the calculation
>> > >>>  mapSideResult.repartitionAndSortWithPartitions(/*partition with
>> > >>> byte[][] which is HTable split key, equivalent to MR shuffle
>> > */).map(/*transform
>> > >>> Tuple2<byte[], byte[]> to Tuple2<ImmutableBytesWritable,
>> > KeyValue>/*equivalent
>> > >>> to MR reducer without output*/).saveAsNewAPIHadoopFile(/*write to
>> > >>> hfile*/)
>> > >>>
>> > >>>   This all works fine on a small dataset, and spark outruns MR
>> by
>> > >>> about 10%. However when I apply it on a dataset of 150 million
>> > records, MR
>> > >>> is about 100% faster than spark.(*MR 25min spark 50min*)
>> > >>>After exploring into the application UI, it shows that in the
>> > >>> repartitionAndSortWithinPartitions stage is very slow, and in the
>> > shuffle
>> > >>> phase a 6GB size shuffle cost about 18min which is quite
>> unreasonable
>> > >>>*Can anyone help with this issue and give me some advice on
>> > >>> this? **It’s not iterative processing, however I believe Spark
>> could be
>> > >>> the same fast at minimal.*
>> > >>>
>> > >>>   Here are the cluster info:
>> > >>>   vm: 8 nodes * (128G mem + 64 core)
>> > >>>   hadoop cluster: hdp 2.2.6
>> > >>>   spark running mode: yarn-client
>> > >>>   spark version: 1.5.1
>> > >>>
>> > >>>
>> > >>
>> >
>>
> --
Best Regard
ZhouQianhao


Re: Re: repartitionAndSortWithinPartitions task shuffle phase is very slow

2015-10-23 Thread 周千昊
We have not tried that yet, however both implementations on MR and spark
are tested on the same amount of partition and same cluster

250635...@qq.com <250635...@qq.com>于2015年10月23日周五 下午5:21写道:

> Hi,
>
> Not an expert on this kind of implementation. But referring to the
> performance result,
>
> if the mapside partitions fittable according to the different datasets?
> Have you tried to
>
> increase the count of partitions?
>
>
>
>
>
> 250635...@qq.com
>
> From: Li Yang
> Date: 2015-10-23 16:17
> To: dev
> CC: Reynold Xin; dev@spark.apache.org
> Subject: Re: repartitionAndSortWithinPartitions task shuffle phase is very
> slow
> Any advise on how to tune the repartitionAndSortWithinPartitions stage?
> Any particular metrics or parameter to look into? Basically Spark and MR
> shuffles the same amount of data, cause we kinda copied MR implementation
> into Spark.
>
> Let us know if more info is needed.
>
> On Fri, Oct 23, 2015 at 10:24 AM, 周千昊 <qhz...@apache.org> wrote:
>
> > +kylin dev list
> >
> > 周千昊 <qhz...@apache.org>于2015年10月23日周五 上午10:20写道:
> >
> > > Hi, Reynold
> > >   Using glom() is because it is easy to adapt to calculation logic
> > > already implemented in MR. And o be clear, we are still in POC.
> > >   Since the results shows there is almost no difference between
> this
> > > glom stage and the MR mapper, using glom here might not be the issue.
> > >   I was trying to monitor the network traffic when repartition
> > > happens, and it showed that the traffic peek is about 200 - 300MB/s
> while
> > > it stayed at speed of about 3-4MB/s for a long time. Have you guys got
> > any
> > > idea about it?
> > >
> > > Reynold Xin <r...@databricks.com>于2015年10月23日周五 上午2:43写道:
> > >
> > >> Why do you do a glom? It seems unnecessarily expensive to materialize
> > >> each partition in memory.
> > >>
> > >>
> > >> On Thu, Oct 22, 2015 at 2:02 AM, 周千昊 <qhz...@apache.org> wrote:
> > >>
> > >>> Hi, spark community
> > >>>   I have an application which I try to migrate from MR to Spark.
> > >>>   It will do some calculations from Hive and output to hfile
> which
> > >>> will be bulk load to HBase Table, details as follow:
> > >>>
> > >>>  Rdd input = getSourceInputFromHive()
> > >>>  Rdd<Tuple2<byte[], byte[]>> mapSideResult =
> > >>> input.glom().mapPartitions(/*some calculation, equivalent to MR
> mapper
> > >>> */)
> > >>>  // PS: the result in each partition has already been sorted
> > >>> according to the lexicographical order during the calculation
> > >>>  mapSideResult.repartitionAndSortWithPartitions(/*partition with
> > >>> byte[][] which is HTable split key, equivalent to MR shuffle
> > */).map(/*transform
> > >>> Tuple2<byte[], byte[]> to Tuple2<ImmutableBytesWritable,
> > KeyValue>/*equivalent
> > >>> to MR reducer without output*/).saveAsNewAPIHadoopFile(/*write to
> > >>> hfile*/)
> > >>>
> > >>>   This all works fine on a small dataset, and spark outruns MR by
> > >>> about 10%. However when I apply it on a dataset of 150 million
> > records, MR
> > >>> is about 100% faster than spark.(*MR 25min spark 50min*)
> > >>>After exploring into the application UI, it shows that in the
> > >>> repartitionAndSortWithinPartitions stage is very slow, and in the
> > shuffle
> > >>> phase a 6GB size shuffle cost about 18min which is quite unreasonable
> > >>>*Can anyone help with this issue and give me some advice on
> > >>> this? **It’s not iterative processing, however I believe Spark could
> be
> > >>> the same fast at minimal.*
> > >>>
> > >>>   Here are the cluster info:
> > >>>   vm: 8 nodes * (128G mem + 64 core)
> > >>>   hadoop cluster: hdp 2.2.6
> > >>>   spark running mode: yarn-client
> > >>>   spark version: 1.5.1
> > >>>
> > >>>
> > >>
> >
>