Hi Xiangrui,

Thanks for your reply!

Yes, our data is very sparse, but RDD.repartition invoke
RDD.coalesce(numPartitions, shuffle = true) internally, so I think it has
the same effect with #2, right?

For CombineInputFormat, although I haven't tried it, but it sounds that it
will combine multiple partitions into a large partition if I cache it, so
same issues as #1?

For coalesce, could you share some best practice how to set the right
number of partitions to avoid locality problem?

Thanks!



On Tue, Aug 12, 2014 at 3:51 PM, Xiangrui Meng <men...@gmail.com> wrote:

> Assuming that your data is very sparse, I would recommend
> RDD.repartition. But if it is not the case and you don't want to
> shuffle the data, you can try a CombineInputFormat and then parse the
> lines into labeled points. Coalesce may cause locality problems if you
> didn't use the right number of partitions. -Xiangrui
>
> On Mon, Aug 11, 2014 at 10:39 PM, ZHENG, Xu-dong <dong...@gmail.com>
> wrote:
> > I think this has the same effect and issue with #1, right?
> >
> >
> > On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen <chenjiush...@gmail.com>
> > wrote:
> >>
> >> How about increase HDFS file extent size? like current value is 128M, we
> >> make it 512M or bigger.
> >>
> >>
> >> On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong <dong...@gmail.com>
> >> wrote:
> >>>
> >>> Hi all,
> >>>
> >>> We are trying to use Spark MLlib to train super large data (100M
> features
> >>> and 5B rows). The input data in HDFS has ~26K partitions. By default,
> MLlib
> >>> will create a task for every partition at each iteration. But because
> our
> >>> dimensions are also very high, such large number of tasks will increase
> >>> large network overhead to transfer the weight vector. So we want to
> reduce
> >>> the number of tasks, we tried below ways:
> >>>
> >>> 1. Coalesce partitions without shuffling, then cache.
> >>>
> >>> data.coalesce(numPartitions).cache()
> >>>
> >>> This works fine for relative small data, but when data is increasing
> and
> >>> numPartitions is fixed, the size of one partition will be large. This
> >>> introduces two issues: the first is, the larger partition will need
> larger
> >>> object and more memory at runtime, and trigger GC more frequently; the
> >>> second is, we meet the issue 'size exceeds integer.max_value' error,
> which
> >>> seems be caused by the size of one partition larger than 2G
> >>> (https://issues.apache.org/jira/browse/SPARK-1391).
> >>>
> >>> 2. Coalesce partitions with shuffling, then cache.
> >>>
> >>> data.coalesce(numPartitions, true).cache()
> >>>
> >>> It could mitigate the second issue in #1 at some degree, but fist issue
> >>> is still there, and it also will introduce large amount of shullfling.
> >>>
> >>> 3. Cache data first, and coalesce partitions.
> >>>
> >>> data.cache().coalesce(numPartitions)
> >>>
> >>> In this way, the number of cached partitions is not change, but each
> task
> >>> read the data from multiple partitions. However, I find the task will
> loss
> >>> locality by this way. I find a lot of 'ANY' tasks, that means that
> tasks
> >>> read data from other nodes, and become slower than that read data from
> local
> >>> memory.
> >>>
> >>> I think the best way should like #3, but leverage locality as more as
> >>> possible. Is there any way to do that? Any suggestions?
> >>>
> >>> Thanks!
> >>>
> >>> --
> >>> ZHENG, Xu-dong
> >>>
> >>
> >
> >
> >
> > --
> > 郑旭东
> > ZHENG, Xu-dong
> >
>



-- 
郑旭东
ZHENG, Xu-dong

Reply via email to