Hi Xiangrui, Thanks for your reply!
Yes, our data is very sparse, but RDD.repartition invoke RDD.coalesce(numPartitions, shuffle = true) internally, so I think it has the same effect with #2, right? For CombineInputFormat, although I haven't tried it, but it sounds that it will combine multiple partitions into a large partition if I cache it, so same issues as #1? For coalesce, could you share some best practice how to set the right number of partitions to avoid locality problem? Thanks! On Tue, Aug 12, 2014 at 3:51 PM, Xiangrui Meng <men...@gmail.com> wrote: > Assuming that your data is very sparse, I would recommend > RDD.repartition. But if it is not the case and you don't want to > shuffle the data, you can try a CombineInputFormat and then parse the > lines into labeled points. Coalesce may cause locality problems if you > didn't use the right number of partitions. -Xiangrui > > On Mon, Aug 11, 2014 at 10:39 PM, ZHENG, Xu-dong <dong...@gmail.com> > wrote: > > I think this has the same effect and issue with #1, right? > > > > > > On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen <chenjiush...@gmail.com> > > wrote: > >> > >> How about increase HDFS file extent size? like current value is 128M, we > >> make it 512M or bigger. > >> > >> > >> On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong <dong...@gmail.com> > >> wrote: > >>> > >>> Hi all, > >>> > >>> We are trying to use Spark MLlib to train super large data (100M > features > >>> and 5B rows). The input data in HDFS has ~26K partitions. By default, > MLlib > >>> will create a task for every partition at each iteration. But because > our > >>> dimensions are also very high, such large number of tasks will increase > >>> large network overhead to transfer the weight vector. So we want to > reduce > >>> the number of tasks, we tried below ways: > >>> > >>> 1. Coalesce partitions without shuffling, then cache. > >>> > >>> data.coalesce(numPartitions).cache() > >>> > >>> This works fine for relative small data, but when data is increasing > and > >>> numPartitions is fixed, the size of one partition will be large. This > >>> introduces two issues: the first is, the larger partition will need > larger > >>> object and more memory at runtime, and trigger GC more frequently; the > >>> second is, we meet the issue 'size exceeds integer.max_value' error, > which > >>> seems be caused by the size of one partition larger than 2G > >>> (https://issues.apache.org/jira/browse/SPARK-1391). > >>> > >>> 2. Coalesce partitions with shuffling, then cache. > >>> > >>> data.coalesce(numPartitions, true).cache() > >>> > >>> It could mitigate the second issue in #1 at some degree, but fist issue > >>> is still there, and it also will introduce large amount of shullfling. > >>> > >>> 3. Cache data first, and coalesce partitions. > >>> > >>> data.cache().coalesce(numPartitions) > >>> > >>> In this way, the number of cached partitions is not change, but each > task > >>> read the data from multiple partitions. However, I find the task will > loss > >>> locality by this way. I find a lot of 'ANY' tasks, that means that > tasks > >>> read data from other nodes, and become slower than that read data from > local > >>> memory. > >>> > >>> I think the best way should like #3, but leverage locality as more as > >>> possible. Is there any way to do that? Any suggestions? > >>> > >>> Thanks! > >>> > >>> -- > >>> ZHENG, Xu-dong > >>> > >> > > > > > > > > -- > > 郑旭东 > > ZHENG, Xu-dong > > > -- 郑旭东 ZHENG, Xu-dong